[improvement](build index)Make build index and clone mutually exclusive and add timeout for index change job (#36293)
Currently the index change job and clone task can be executed at the same time. If the clone task gets stuck at this point, it will cause the index change job to get stuck as well and keep retrying. To solve this problem, we can refer to alter job and make index change job exclusive with clone task, and introduce the timeout to prevent infinite retries of build index. Add the following checks and status in FE. 1. Check if table is stable (build index is not allowed when clone is in progress) 1.1. Tablet is HEALTHY. 1.2. Whether the tablet is included in the Tablet scheduler, if so, it means the current tablet is doing clone. 2. When creating the index change job, set the timeout at the same time. pick from master #35724
This commit is contained in:
@ -37,6 +37,7 @@ import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.task.AgentBatchTask;
|
||||
import org.apache.doris.task.AgentTask;
|
||||
import org.apache.doris.task.AgentTaskExecutor;
|
||||
import org.apache.doris.task.AgentTaskQueue;
|
||||
import org.apache.doris.task.AlterInvertedIndexTask;
|
||||
@ -46,6 +47,7 @@ import org.apache.doris.thrift.TTaskType;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -54,6 +56,7 @@ import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class IndexChangeJob implements Writable {
|
||||
@ -106,6 +109,11 @@ public class IndexChangeJob implements Writable {
|
||||
private long originIndexId;
|
||||
@SerializedName(value = "invertedIndexBatchTask")
|
||||
AgentBatchTask invertedIndexBatchTask = new AgentBatchTask();
|
||||
// save failed task after retry three times, tablet -> backends
|
||||
@SerializedName(value = "failedTabletBackends")
|
||||
protected Map<Long, List<Long>> failedTabletBackends = Maps.newHashMap();
|
||||
@SerializedName(value = "timeoutMs")
|
||||
protected long timeoutMs = -1;
|
||||
|
||||
public IndexChangeJob() {
|
||||
this.jobId = -1;
|
||||
@ -117,7 +125,7 @@ public class IndexChangeJob implements Writable {
|
||||
this.jobState = JobState.WAITING_TXN;
|
||||
}
|
||||
|
||||
public IndexChangeJob(long jobId, long dbId, long tableId, String tableName) {
|
||||
public IndexChangeJob(long jobId, long dbId, long tableId, String tableName, long timeoutMs) {
|
||||
this.jobId = jobId;
|
||||
this.dbId = dbId;
|
||||
this.tableId = tableId;
|
||||
@ -127,6 +135,7 @@ public class IndexChangeJob implements Writable {
|
||||
this.jobState = JobState.WAITING_TXN;
|
||||
this.watershedTxnId = Env.getCurrentGlobalTransactionMgr()
|
||||
.getTransactionIDGenerator().getNextTransactionId();
|
||||
this.timeoutMs = timeoutMs;
|
||||
}
|
||||
|
||||
public long getJobId() {
|
||||
@ -207,6 +216,10 @@ public class IndexChangeJob implements Writable {
|
||||
this.finishedTimeMs = finishedTimeMs;
|
||||
}
|
||||
|
||||
public boolean isTimeout() {
|
||||
return System.currentTimeMillis() - createTimeMs > timeoutMs;
|
||||
}
|
||||
|
||||
/**
|
||||
* The keyword 'synchronized' only protects 2 methods:
|
||||
* run() and cancel()
|
||||
@ -218,6 +231,10 @@ public class IndexChangeJob implements Writable {
|
||||
* db lock
|
||||
*/
|
||||
public synchronized void run() {
|
||||
if (isTimeout()) {
|
||||
cancelImpl("Timeout");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
switch (jobState) {
|
||||
case WAITING_TXN:
|
||||
@ -238,6 +255,31 @@ public class IndexChangeJob implements Writable {
|
||||
return cancelImpl(errMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* should be called before executing the job.
|
||||
* return false if table is not stable.
|
||||
*/
|
||||
protected boolean checkTableStable(OlapTable tbl) throws AlterCancelException {
|
||||
tbl.writeLockOrAlterCancelException();
|
||||
try {
|
||||
boolean isStable = tbl.isStable(Env.getCurrentSystemInfo(),
|
||||
Env.getCurrentEnv().getTabletScheduler());
|
||||
|
||||
if (!isStable) {
|
||||
errMsg = "table is unstable";
|
||||
LOG.warn("wait table {} to be stable before doing index change job", tableId);
|
||||
return false;
|
||||
} else {
|
||||
// table is stable
|
||||
LOG.info("table {} is stable, start index change job {}", tableId, jobId);
|
||||
errMsg = "";
|
||||
return true;
|
||||
}
|
||||
} finally {
|
||||
tbl.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished.
|
||||
protected boolean isPreviousLoadFinished() throws AnalysisException {
|
||||
return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
|
||||
@ -266,6 +308,10 @@ public class IndexChangeJob implements Writable {
|
||||
throw new AlterCancelException(e.getMessage());
|
||||
}
|
||||
|
||||
if (!checkTableStable(olapTable)) {
|
||||
return;
|
||||
}
|
||||
|
||||
olapTable.readLock();
|
||||
try {
|
||||
List<Column> originSchemaColumns = olapTable.getSchemaByIndexId(originIndexId, true);
|
||||
@ -308,10 +354,36 @@ public class IndexChangeJob implements Writable {
|
||||
|
||||
protected void runRunningJob() throws AlterCancelException {
|
||||
Preconditions.checkState(jobState == JobState.RUNNING, jobState);
|
||||
// must check if db or table still exist first.
|
||||
// or if table is dropped, the tasks will never be finished,
|
||||
// and the job will be in RUNNING state forever.
|
||||
Database db = Env.getCurrentInternalCatalog()
|
||||
.getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));
|
||||
OlapTable tbl;
|
||||
try {
|
||||
tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
|
||||
} catch (MetaNotFoundException e) {
|
||||
throw new AlterCancelException(e.getMessage());
|
||||
}
|
||||
|
||||
if (!invertedIndexBatchTask.isFinished()) {
|
||||
LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId);
|
||||
// TODO: task failed limit
|
||||
List<AgentTask> tasks = invertedIndexBatchTask.getUnfinishedTasks(2000);
|
||||
for (AgentTask task : tasks) {
|
||||
if (task.getFailedTimes() > 3) {
|
||||
LOG.warn("alter inverted index task failed: " + task.getErrorMsg());
|
||||
List<Long> failedBackends = failedTabletBackends.computeIfAbsent(task.getTabletId(),
|
||||
k -> Lists.newArrayList());
|
||||
failedBackends.add(task.getBackendId());
|
||||
int expectSucceedTaskNum = tbl.getPartitionInfo()
|
||||
.getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum();
|
||||
int failedTaskCount = failedBackends.size();
|
||||
if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) {
|
||||
throw new AlterCancelException("inverted index tasks failed on same tablet reach threshold "
|
||||
+ failedTaskCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -2944,13 +2944,14 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
throw new DdlException("Nothing is changed. please check your alter stmt.");
|
||||
}
|
||||
|
||||
long timeoutSecond = Config.alter_table_timeout_second;
|
||||
for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
|
||||
long originIndexId = entry.getKey();
|
||||
for (Partition partition : olapTable.getPartitions()) {
|
||||
// create job
|
||||
long jobId = Env.getCurrentEnv().getNextId();
|
||||
IndexChangeJob indexChangeJob = new IndexChangeJob(
|
||||
jobId, db.getId(), olapTable.getId(), olapTable.getName());
|
||||
jobId, db.getId(), olapTable.getId(), olapTable.getName(), timeoutSecond * 1000);
|
||||
indexChangeJob.setOriginIndexId(originIndexId);
|
||||
indexChangeJob.setAlterInvertedIndexInfo(isDropOp, alterIndexes);
|
||||
long partitionId = partition.getId();
|
||||
|
||||
Reference in New Issue
Block a user