[improvement](build index)Optimize failed task check on same tablet (#42295) (#43589)

bp #42295

Co-authored-by: qidaye <luen@selectdb.com>
This commit is contained in:
qiye
2024-11-12 09:56:55 +08:00
committed by GitHub
parent 541284ec21
commit 4c224bb1dd
11 changed files with 624 additions and 32 deletions

View File

@ -69,6 +69,7 @@ namespace ErrorCode {
TStatusError(HTTP_ERROR, true); \
TStatusError(TABLET_MISSING, true); \
TStatusError(NOT_MASTER, true); \
TStatusError(OBTAIN_LOCK_FAILED, false); \
TStatusError(DELETE_BITMAP_LOCK_ERROR, false);
// E error_name, error_code, print_stacktrace
#define APPLY_FOR_OLAP_ERROR_CODES(E) \
@ -479,6 +480,7 @@ public:
ERROR_CTOR(HttpError, HTTP_ERROR)
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR(CgroupError, CGROUP_ERROR)
ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED)
#undef ERROR_CTOR
template <int code>

View File

@ -645,37 +645,41 @@ Status IndexBuilder::do_build_inverted_index() {
std::unique_lock<std::mutex> schema_change_lock(_tablet->get_schema_change_lock(),
std::try_to_lock);
if (!schema_change_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try schema_change_lock failed");
return Status::ObtainLockFailed("try schema_change_lock failed. tablet={} ",
_tablet->tablet_id());
}
// Check executing serially with compaction task.
std::unique_lock<std::mutex> base_compaction_lock(_tablet->get_base_compaction_lock(),
std::try_to_lock);
if (!base_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try base_compaction_lock failed");
return Status::ObtainLockFailed("try base_compaction_lock failed. tablet={} ",
_tablet->tablet_id());
}
std::unique_lock<std::mutex> cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(),
std::try_to_lock);
if (!cumu_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try cumu_compaction_lock failed");
return Status::ObtainLockFailed("try cumu_compaction_lock failed. tablet={}",
_tablet->tablet_id());
}
std::unique_lock<std::mutex> cold_compaction_lock(_tablet->get_cold_compaction_lock(),
std::try_to_lock);
if (!cold_compaction_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("try cold_compaction_lock failed");
return Status::ObtainLockFailed("try cold_compaction_lock failed. tablet={}",
_tablet->tablet_id());
}
std::unique_lock<std::mutex> build_inverted_index_lock(_tablet->get_build_inverted_index_lock(),
std::try_to_lock);
if (!build_inverted_index_lock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>(
"failed to obtain build inverted index lock. tablet={}", _tablet->tablet_id());
return Status::ObtainLockFailed("failed to obtain build inverted index lock. tablet={}",
_tablet->tablet_id());
}
std::shared_lock migration_rlock(_tablet->get_migration_lock(), std::try_to_lock);
if (!migration_rlock.owns_lock()) {
return Status::Error<ErrorCode::TRY_LOCK_FAILED>("got migration_rlock failed. tablet={}",
_tablet->tablet_id());
return Status::ObtainLockFailed("got migration_rlock failed. tablet={}",
_tablet->tablet_id());
}
_input_rowsets =

View File

@ -42,12 +42,12 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterInvertedIndexTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -56,12 +56,12 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class IndexChangeJob implements Writable {
private static final Logger LOG = LogManager.getLogger(IndexChangeJob.class);
static final int MAX_FAILED_NUM = 10;
static final int MIN_FAILED_NUM = 3;
public enum JobState {
// CHECKSTYLE OFF
@ -109,9 +109,6 @@ public class IndexChangeJob implements Writable {
private long originIndexId;
@SerializedName(value = "invertedIndexBatchTask")
AgentBatchTask invertedIndexBatchTask = new AgentBatchTask();
// save failed task after retry three times, tablet -> backends
@SerializedName(value = "failedTabletBackends")
protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();
@SerializedName(value = "timeoutMs")
protected long timeoutMs = -1;
@ -344,7 +341,9 @@ public class IndexChangeJob implements Writable {
LOG.info("invertedIndexBatchTask:{}", invertedIndexBatchTask);
AgentTaskQueue.addBatchTask(invertedIndexBatchTask);
AgentTaskExecutor.submit(invertedIndexBatchTask);
if (!FeConstants.runningUnitTest) {
AgentTaskExecutor.submit(invertedIndexBatchTask);
}
} finally {
olapTable.readUnlock();
}
@ -359,9 +358,8 @@ public class IndexChangeJob implements Writable {
// and the job will be in RUNNING state forever.
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
OlapTable tbl;
try {
tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
db.getTableOrMetaException(tableId, TableType.OLAP);
} catch (MetaNotFoundException e) {
throw new AlterCancelException(e.getMessage());
}
@ -370,18 +368,19 @@ public class IndexChangeJob implements Writable {
LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId);
List<AgentTask> tasks = invertedIndexBatchTask.getUnfinishedTasks(2000);
for (AgentTask task : tasks) {
if (task.getFailedTimes() > 3) {
if (task.getFailedTimes() >= MIN_FAILED_NUM) {
LOG.warn("alter inverted index task failed: " + task.getErrorMsg());
List<Long> failedBackends = failedTabletBackends.computeIfAbsent(task.getTabletId(),
k -> Lists.newArrayList());
failedBackends.add(task.getBackendId());
int expectSucceedTaskNum = tbl.getPartitionInfo()
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
int failedTaskCount = failedBackends.size();
if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
throw new AlterCancelException("inverted index tasks failed on same tablet reach threshold "
+ failedTaskCount);
// If error is obtaining lock failed.
// we should do more tries.
if (task.getErrorCode().equals(TStatusCode.OBTAIN_LOCK_FAILED)) {
if (task.getFailedTimes() < MAX_FAILED_NUM) {
continue;
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MAX_FAILED_NUM + ", error: " + task.getErrorMsg());
}
throw new AlterCancelException("inverted index tasks failed times reach threshold "
+ MIN_FAILED_NUM + ", error: " + task.getErrorMsg());
}
}
return;
@ -390,7 +389,9 @@ public class IndexChangeJob implements Writable {
this.jobState = JobState.FINISHED;
this.finishedTimeMs = System.currentTimeMillis();
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
}
LOG.info("inverted index job finished: {}", jobId);
}
@ -408,7 +409,9 @@ public class IndexChangeJob implements Writable {
jobState = JobState.CANCELLED;
this.errMsg = errMsg;
this.finishedTimeMs = System.currentTimeMillis();
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(this);
}
LOG.info("cancel index job {}, err: {}", jobId, errMsg);
return true;
}

View File

@ -2764,7 +2764,9 @@ public class SchemaChangeHandler extends AlterHandler {
if (LOG.isDebugEnabled()) {
LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info);
}
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
}
// Drop table column stats after light schema change finished.
Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable);
@ -3021,7 +3023,9 @@ public class SchemaChangeHandler extends AlterHandler {
addIndexChangeJob(indexChangeJob);
// write edit log
Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob);
if (!FeConstants.runningUnitTest) {
Env.getCurrentEnv().getEditLog().logIndexChangeJob(indexChangeJob);
}
LOG.info("finish create table's inverted index job. table: {}, partition: {}, job: {}",
olapTable.getName(), partitionName, jobId);
} // end for partition

View File

@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@ -74,6 +75,9 @@ public class CancelAlterTableStmt extends CancelStmt {
// disallow external catalog
Util.prohibitExternalCatalog(dbTableName.getCtl(), this.getClass().getSimpleName());
if (FeConstants.runningUnitTest) {
return;
}
// check access
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), dbTableName.getCtl(), dbTableName.getDb(),

View File

@ -140,6 +140,7 @@ public class MasterImpl {
+ (taskStatus.isSetErrorMsgs() ? (", status_message: " + taskStatus.getErrorMsgs()) : "")
+ ", backendId: " + backend + ", signature: " + signature;
task.setErrorMsg(errMsg);
task.setErrorCode(taskStatus.getStatusCode());
// We start to let FE perceive the task's error msg
if (taskType != TTaskType.MAKE_SNAPSHOT && taskType != TTaskType.UPLOAD
&& taskType != TTaskType.DOWNLOAD && taskType != TTaskType.MOVE

View File

@ -19,6 +19,7 @@ package org.apache.doris.task;
import org.apache.doris.common.Config;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;
public abstract class AgentTask {
@ -36,6 +37,7 @@ public abstract class AgentTask {
protected int failedTimes;
protected String errorMsg;
protected TStatusCode errorCode;
// some of process may use this member to check if the task is finished.
// some of are not.
// so whether the task is finished depends on caller's logic, not the value of this member.
@ -126,6 +128,14 @@ public abstract class AgentTask {
return errorMsg;
}
public TStatusCode getErrorCode() {
return errorCode;
}
public void setErrorCode(TStatusCode errorCode) {
this.errorCode = errorCode;
}
public void setFinished(boolean isFinished) {
this.isFinished = isFinished;
}

View File

@ -156,7 +156,7 @@ public class AgentTaskQueue {
// this is just for unit test
public static synchronized List<AgentTask> getTask(TTaskType type) {
List<AgentTask> res = Lists.newArrayList();
for (Map<Long, AgentTask> agentTasks : tasks.column(TTaskType.ALTER).values()) {
for (Map<Long, AgentTask> agentTasks : tasks.column(type).values()) {
res.addAll(agentTasks.values());
}
return res;

View File

@ -0,0 +1,555 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BuildIndexClause;
import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CreateIndexClause;
import org.apache.doris.analysis.DropIndexClause;
import org.apache.doris.analysis.IndexDef;
import org.apache.doris.analysis.ShowAlterStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.CatalogTestUtil;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FakeEditLog;
import org.apache.doris.catalog.FakeEnv;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Partition.PartitionState;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TableProperty;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.transaction.FakeTransactionIDGenerator;
import org.apache.doris.transaction.GlobalTransactionMgr;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class IndexChangeJobTest {
private static String fileName = "./IndexChangeJobTest";
private static FakeEditLog fakeEditLog;
private static FakeEnv fakeEnv;
private static FakeTransactionIDGenerator fakeTransactionIDGenerator;
private static GlobalTransactionMgr masterTransMgr;
private static GlobalTransactionMgr slaveTransMgr;
private static Env masterEnv;
private static Env slaveEnv;
private static Analyzer analyzer;
private static Database db;
private static OlapTable olapTable;
private static CreateIndexClause createIndexClause;
private static BuildIndexClause buildIndexClause;
private static DropIndexClause dropIndexClause;
private static CancelAlterTableStmt cancelAlterTableStmt;
@Rule
public ExpectedException expectedEx = ExpectedException.none();
@Before
public void setUp()
throws InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException,
NoSuchMethodException, SecurityException, UserException {
FeConstants.runningUnitTest = true;
FakeEnv.setMetaVersion(FeMetaVersion.VERSION_CURRENT);
fakeEditLog = new FakeEditLog();
fakeEnv = new FakeEnv();
fakeTransactionIDGenerator = new FakeTransactionIDGenerator();
masterEnv = CatalogTestUtil.createTestCatalog();
slaveEnv = CatalogTestUtil.createTestCatalog();
masterTransMgr = (GlobalTransactionMgr) masterEnv.getGlobalTransactionMgr();
masterTransMgr.setEditLog(masterEnv.getEditLog());
slaveTransMgr = (GlobalTransactionMgr) slaveEnv.getGlobalTransactionMgr();
slaveTransMgr.setEditLog(slaveEnv.getEditLog());
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
db = masterEnv.getInternalCatalog().getDbOrDdlException(CatalogTestUtil.testDbId1);
olapTable = (OlapTable) db.getTableOrDdlException(CatalogTestUtil.testTableId1);
// set mow table property
Map<String, String> properties = Maps.newHashMap();
properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "false");
TableProperty tableProperty = new TableProperty(properties);
olapTable.setTableProperty(tableProperty);
TableName tableName = new TableName(masterEnv.getInternalCatalog().getName(), db.getName(), olapTable.getName());
IndexDef indexDef = new IndexDef("index1", false, Lists.newArrayList(olapTable.getBaseSchema().get(1).getName()), IndexDef.IndexType.INVERTED, Maps.newHashMap(), "balabala");
createIndexClause = new CreateIndexClause(tableName, indexDef, false);
createIndexClause.analyze(analyzer);
buildIndexClause = new BuildIndexClause(tableName, indexDef, false);
buildIndexClause.analyze(analyzer);
dropIndexClause = new DropIndexClause("index1", false, tableName, false);
dropIndexClause.analyze(analyzer);
cancelAlterTableStmt = new CancelAlterTableStmt(ShowAlterStmt.AlterType.INDEX, tableName);
cancelAlterTableStmt.analyze(analyzer);
AgentTaskQueue.clearAllTasks();
}
@Test
public void testCreateIndexIndexChange() throws UserException {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(0, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
}
@Test
public void testBuildIndexIndexChange() throws UserException {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
alterClauses.clear();
alterClauses.add(buildIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(1, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
}
@Test
public void testDropIndexIndexChange() throws UserException {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
alterClauses.clear();
alterClauses.add(dropIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(1, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
Assert.assertEquals(olapTable.getIndexes().size(), 0);
}
@Test
// start a build index job, then normally finish it
public void testBuildIndexIndexChangeNormal() throws UserException {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
alterClauses.clear();
alterClauses.add(buildIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(1, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get();
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0);
Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState());
// run waiting txn job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3);
// run running job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
// finish alter tasks
List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX);
Assert.assertEquals(3, tasks.size());
for (AgentTask agentTask : tasks) {
agentTask.setFinished(true);
}
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.FINISHED, indexChangejob.getJobState());
}
@Test
// start a drop index job, then normally finish it
public void testDropIndexIndexChangeNormal() throws UserException {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
alterClauses.clear();
alterClauses.add(dropIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(1, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get();
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0);
Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState());
// run waiting txn job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3);
// run running job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
// finish alter tasks
List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX);
Assert.assertEquals(3, tasks.size());
for (AgentTask agentTask : tasks) {
agentTask.setFinished(true);
}
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.FINISHED, indexChangejob.getJobState());
}
@Test
public void testCancelBuildIndexIndexChangeNormal() throws UserException {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
alterClauses.clear();
alterClauses.add(buildIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(1, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get();
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0);
Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState());
// run waiting txn job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3);
// run running job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
// cancel build index job
schemaChangeHandler.cancel(cancelAlterTableStmt);
List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX);
Assert.assertEquals(0, tasks.size());
Assert.assertEquals(IndexChangeJob.JobState.CANCELLED, indexChangejob.getJobState());
}
@Test
public void testBuildIndexIndexChangeWhileTableNotStable() throws Exception {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
olapTable.setState(OlapTableState.SCHEMA_CHANGE);
expectedEx.expect(DdlException.class);
expectedEx.expectMessage("errCode = 2, detailMessage = Table[testTable1]'s state(SCHEMA_CHANGE) is not NORMAL. Do not allow doing ALTER ops");
schemaChangeHandler.process(alterClauses, db, olapTable);
olapTable.setState(OlapTableState.NORMAL);
schemaChangeHandler.process(alterClauses, db, olapTable);
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
alterClauses.clear();
alterClauses.add(buildIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(1, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get();
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0);
Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
MaterializedIndex baseIndex = testPartition.getBaseIndex();
Assert.assertEquals(IndexState.NORMAL, baseIndex.getState());
Assert.assertEquals(PartitionState.NORMAL, testPartition.getState());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
Tablet baseTablet = baseIndex.getTablets().get(0);
List<Replica> replicas = baseTablet.getReplicas();
Replica replica2 = replicas.get(1);
Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState());
// run waiting txn job, set replica2 to clone
replica2.setState(Replica.ReplicaState.CLONE);
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState());
// rerun waiting txn job, set replica2 to normal
replica2.setState(Replica.ReplicaState.NORMAL);
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3);
// run running job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
// finish alter tasks
List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX);
Assert.assertEquals(3, tasks.size());
for (AgentTask agentTask : tasks) {
agentTask.setFinished(true);
}
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.FINISHED, indexChangejob.getJobState());
}
@Test
public void testDropIndexIndexChangeWhileTableNotStable() throws Exception {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
olapTable.setState(OlapTableState.SCHEMA_CHANGE);
expectedEx.expect(DdlException.class);
expectedEx.expectMessage("errCode = 2, detailMessage = Table[testTable1]'s state(SCHEMA_CHANGE) is not NORMAL. Do not allow doing ALTER ops");
schemaChangeHandler.process(alterClauses, db, olapTable);
olapTable.setState(OlapTableState.NORMAL);
schemaChangeHandler.process(alterClauses, db, olapTable);
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
alterClauses.clear();
alterClauses.add(dropIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(1, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get();
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0);
Partition testPartition = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
MaterializedIndex baseIndex = testPartition.getBaseIndex();
Assert.assertEquals(IndexState.NORMAL, baseIndex.getState());
Assert.assertEquals(PartitionState.NORMAL, testPartition.getState());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
Tablet baseTablet = baseIndex.getTablets().get(0);
List<Replica> replicas = baseTablet.getReplicas();
Replica replica2 = replicas.get(1);
Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState());
// run waiting txn job, set replica2 to clone
replica2.setState(Replica.ReplicaState.CLONE);
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState());
// rerun waiting txn job, set replica2 to normal
replica2.setState(Replica.ReplicaState.NORMAL);
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3);
// run running job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
// finish alter tasks
List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX);
Assert.assertEquals(3, tasks.size());
for (AgentTask agentTask : tasks) {
agentTask.setFinished(true);
}
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.FINISHED, indexChangejob.getJobState());
}
@Test
public void testBuildIndexFailedWithMinFailedNum() throws Exception {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
alterClauses.clear();
alterClauses.add(buildIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(1, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get();
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0);
Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState());
// run waiting txn job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3);
// run running job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX);
Assert.assertEquals(3, tasks.size());
// if one task failed, the job should be failed
// if task error is not OBTAIN_LOCK_FAILED, the job should be failed after MIN_FAILED_NUM = 3 times
AgentTask agentTask = tasks.get(0);
agentTask.setErrorCode(TStatusCode.IO_ERROR);
Assert.assertEquals(agentTask.getFailedTimes(), 0);
for (int i = 0; i < IndexChangeJob.MIN_FAILED_NUM; i++) {
agentTask.failed();
schemaChangeHandler.runAfterCatalogReady();
if (i < IndexChangeJob.MIN_FAILED_NUM - 1) {
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
}
}
Assert.assertEquals(IndexChangeJob.JobState.CANCELLED, indexChangejob.getJobState());
}
@Test
public void testBuildIndexFailedWithMaxFailedNum() throws Exception {
fakeEnv = new FakeEnv();
fakeEditLog = new FakeEditLog();
FakeEnv.setEnv(masterEnv);
SchemaChangeHandler schemaChangeHandler = Env.getCurrentEnv().getSchemaChangeHandler();
ArrayList<AlterClause> alterClauses = new ArrayList<>();
alterClauses.add(createIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Assert.assertEquals(olapTable.getIndexes().size(), 1);
Assert.assertEquals(olapTable.getIndexes().get(0).getIndexName(), "index1");
alterClauses.clear();
alterClauses.add(buildIndexClause);
schemaChangeHandler.process(alterClauses, db, olapTable);
Map<Long, IndexChangeJob> indexChangeJobMap = schemaChangeHandler.getIndexChangeJobs();
Assert.assertEquals(1, indexChangeJobMap.size());
Assert.assertEquals(OlapTableState.NORMAL, olapTable.getState());
IndexChangeJob indexChangejob = indexChangeJobMap.values().stream().findAny().get();
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 0);
Assert.assertEquals(IndexChangeJob.JobState.WAITING_TXN, indexChangejob.getJobState());
// run waiting txn job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
Assert.assertEquals(indexChangejob.invertedIndexBatchTask.getTaskNum(), 3);
// run running job
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
schemaChangeHandler.runAfterCatalogReady();
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
List<AgentTask> tasks = AgentTaskQueue.getTask(TTaskType.ALTER_INVERTED_INDEX);
Assert.assertEquals(3, tasks.size());
// if one task failed, the job should be failed
// if task error is OBTAIN_LOCK_FAILED, the job should be failed after MAX_FAILED_NUM = 10 times
AgentTask agentTask = tasks.get(0);
agentTask.setErrorCode(TStatusCode.OBTAIN_LOCK_FAILED);
Assert.assertEquals(agentTask.getFailedTimes(), 0);
for (int i = 0; i < IndexChangeJob.MAX_FAILED_NUM; i++) {
agentTask.failed();
schemaChangeHandler.runAfterCatalogReady();
if (i < IndexChangeJob.MAX_FAILED_NUM - 1) {
Assert.assertEquals(IndexChangeJob.JobState.RUNNING, indexChangejob.getJobState());
}
}
Assert.assertEquals(IndexChangeJob.JobState.CANCELLED, indexChangejob.getJobState());
}
}

View File

@ -89,5 +89,12 @@ public class CancelAlterStmtTest {
Assert.assertEquals("CANCEL ALTER ROLLUP FROM `testDb`.`testTbl`", stmt.toString());
Assert.assertEquals("testDb", stmt.getDbName());
Assert.assertEquals(AlterType.ROLLUP, stmt.getAlterType());
stmt = new CancelAlterTableStmt(AlterType.INDEX,
new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, null, "testTbl"));
stmt.analyze(analyzer);
Assert.assertEquals("CANCEL ALTER INDEX FROM `testDb`.`testTbl`", stmt.toString());
Assert.assertEquals("testDb", stmt.getDbName());
Assert.assertEquals(AlterType.INDEX, stmt.getAlterType());
}
}

View File

@ -103,6 +103,8 @@ enum TStatusCode {
NOT_MASTER = 73,
OBTAIN_LOCK_FAILED = 74,
// used for cloud
DELETE_BITMAP_LOCK_ERROR = 100,
// Not be larger than 200, see status.h