diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java index e6f4c4e0a0..6f2b358e16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java @@ -37,6 +37,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterInvertedIndexTask; @@ -46,6 +47,7 @@ import org.apache.doris.thrift.TTaskType; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,6 +56,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import java.util.Map; public class IndexChangeJob implements Writable { @@ -106,6 +109,11 @@ public class IndexChangeJob implements Writable { private long originIndexId; @SerializedName(value = "invertedIndexBatchTask") AgentBatchTask invertedIndexBatchTask = new AgentBatchTask(); + // save failed task after retry three times, tablet -> backends + @SerializedName(value = "failedTabletBackends") + protected Map> failedTabletBackends = Maps.newHashMap(); + @SerializedName(value = "timeoutMs") + protected long timeoutMs = -1; public IndexChangeJob() { this.jobId = -1; @@ -117,7 +125,7 @@ public class IndexChangeJob implements Writable { this.jobState = JobState.WAITING_TXN; } - public IndexChangeJob(long jobId, long dbId, long tableId, String tableName) { + public IndexChangeJob(long jobId, long dbId, long tableId, String tableName, long timeoutMs) { this.jobId = jobId; this.dbId = dbId; this.tableId = tableId; @@ -127,6 +135,7 @@ public class IndexChangeJob implements Writable { this.jobState = JobState.WAITING_TXN; this.watershedTxnId = Env.getCurrentGlobalTransactionMgr() .getTransactionIDGenerator().getNextTransactionId(); + this.timeoutMs = timeoutMs; } public long getJobId() { @@ -207,6 +216,10 @@ public class IndexChangeJob implements Writable { this.finishedTimeMs = finishedTimeMs; } + public boolean isTimeout() { + return System.currentTimeMillis() - createTimeMs > timeoutMs; + } + /** * The keyword 'synchronized' only protects 2 methods: * run() and cancel() @@ -218,6 +231,10 @@ public class IndexChangeJob implements Writable { * db lock */ public synchronized void run() { + if (isTimeout()) { + cancelImpl("Timeout"); + return; + } try { switch (jobState) { case WAITING_TXN: @@ -238,6 +255,31 @@ public class IndexChangeJob implements Writable { return cancelImpl(errMsg); } + /** + * should be called before executing the job. + * return false if table is not stable. + */ + protected boolean checkTableStable(OlapTable tbl) throws AlterCancelException { + tbl.writeLockOrAlterCancelException(); + try { + boolean isStable = tbl.isStable(Env.getCurrentSystemInfo(), + Env.getCurrentEnv().getTabletScheduler()); + + if (!isStable) { + errMsg = "table is unstable"; + LOG.warn("wait table {} to be stable before doing index change job", tableId); + return false; + } else { + // table is stable + LOG.info("table {} is stable, start index change job {}", tableId, jobId); + errMsg = ""; + return true; + } + } finally { + tbl.writeUnlock(); + } + } + // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished. protected boolean isPreviousLoadFinished() throws AnalysisException { return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( @@ -266,6 +308,10 @@ public class IndexChangeJob implements Writable { throw new AlterCancelException(e.getMessage()); } + if (!checkTableStable(olapTable)) { + return; + } + olapTable.readLock(); try { List originSchemaColumns = olapTable.getSchemaByIndexId(originIndexId, true); @@ -308,10 +354,36 @@ public class IndexChangeJob implements Writable { protected void runRunningJob() throws AlterCancelException { Preconditions.checkState(jobState == JobState.RUNNING, jobState); + // must check if db or table still exist first. + // or if table is dropped, the tasks will never be finished, + // and the job will be in RUNNING state forever. + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); + OlapTable tbl; + try { + tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); + } catch (MetaNotFoundException e) { + throw new AlterCancelException(e.getMessage()); + } if (!invertedIndexBatchTask.isFinished()) { LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId); - // TODO: task failed limit + List tasks = invertedIndexBatchTask.getUnfinishedTasks(2000); + for (AgentTask task : tasks) { + if (task.getFailedTimes() > 3) { + LOG.warn("alter inverted index task failed: " + task.getErrorMsg()); + List failedBackends = failedTabletBackends.computeIfAbsent(task.getTabletId(), + k -> Lists.newArrayList()); + failedBackends.add(task.getBackendId()); + int expectSucceedTaskNum = tbl.getPartitionInfo() + .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); + int failedTaskCount = failedBackends.size(); + if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { + throw new AlterCancelException("inverted index tasks failed on same tablet reach threshold " + + failedTaskCount); + } + } + } return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 602b7600c9..a3572caad3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2944,13 +2944,14 @@ public class SchemaChangeHandler extends AlterHandler { throw new DdlException("Nothing is changed. please check your alter stmt."); } + long timeoutSecond = Config.alter_table_timeout_second; for (Map.Entry> entry : changedIndexIdToSchema.entrySet()) { long originIndexId = entry.getKey(); for (Partition partition : olapTable.getPartitions()) { // create job long jobId = Env.getCurrentEnv().getNextId(); IndexChangeJob indexChangeJob = new IndexChangeJob( - jobId, db.getId(), olapTable.getId(), olapTable.getName()); + jobId, db.getId(), olapTable.getId(), olapTable.getName(), timeoutSecond * 1000); indexChangeJob.setOriginIndexId(originIndexId); indexChangeJob.setAlterInvertedIndexInfo(isDropOp, alterIndexes); long partitionId = partition.getId(); diff --git a/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy new file mode 100644 index 0000000000..bfbf5a4896 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_build_index_with_clone_fault_injection", "nonConcurrent"){ + def backends = sql_return_maparray('show backends') + // if backens is less than 2, skip this case + if (backends.size() < 2) { + return + } + def timeout = 300000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() == 0) { + logger.info(table_name + " last index job finished") + return "SKIPPED" + } + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + sleep(10000) // wait change table state to normal + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + } + useTime = t + sleep(delta_time) + } + logger.info("wait_for_last_build_index_on_table_finish debug: " + alter_res) + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def tbl = 'test_build_index_with_clone' + try { + GetDebugPoint().enableDebugPointForAllBEs("EngineCloneTask.wait_clone") + logger.info("add debug point EngineCloneTask.wait_clone") + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ + CREATE TABLE ${tbl} ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + for (def i = 1; i <= 5; i++) { + sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})" + } + + sql """ sync """ + + // get tablets and set replica status to DROP + def tablet = sql_return_maparray("show tablets from ${tbl}")[0] + sql """ + ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop"); + """ + // create index on table + sql """ create index idx_k2 on ${tbl}(k2) using inverted """ + sql """ build index idx_k2 on ${tbl} """ + // sleep 5s to wait for the build index job report table is unstable + sleep(5000) + def show_build_index = sql_return_maparray("show build index where TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1") + assertEquals('WAITING_TXN', show_build_index[0].State) + assertEquals('table is unstable', show_build_index[0].Msg) + + def state = wait_for_last_build_index_on_table_finish(tbl, timeout) + assertEquals(state, "FINISHED") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("EngineCloneTask.wait_clone") + } +} diff --git a/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy new file mode 100644 index 0000000000..9d30ca30c0 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import org.apache.doris.regression.suite.SuiteCluster + +suite("test_build_index_with_clone_by_docker"){ + def timeout = 300000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() == 0) { + logger.info(table_name + " last index job finished") + return "SKIPPED" + } + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + sleep(10000) // wait change table state to normal + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + } + useTime = t + sleep(delta_time) + } + logger.info("wait_for_last_build_index_on_table_finish debug: " + alter_res) + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def options = new ClusterOptions() + options.enableDebugPoints() + options.setFeNum(1) + options.setBeNum(3) + options.cloudMode = false + def tbl = 'test_build_index_with_clone_by_docker' + docker(options) { + cluster.injectDebugPoints(NodeType.BE, ['EngineCloneTask.wait_clone' : null]) + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ + CREATE TABLE ${tbl} ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + for (def i = 1; i <= 5; i++) { + sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})" + } + + sql """ sync """ + + // get tablets and set replica status to DROP + def tablet = sql_return_maparray("show tablets from ${tbl}")[0] + sql """ + ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop"); + """ + // create index on table + sql """ create index idx_k2 on ${tbl}(k2) using inverted """ + sql """ build index idx_k2 on ${tbl} """ + // sleep 5s to wait for the build index job report table is unstable + sleep(5000) + def show_build_index = sql_return_maparray("show build index where TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1") + assertEquals('WAITING_TXN', show_build_index[0].State) + assertEquals('table is unstable', show_build_index[0].Msg) + + def state = wait_for_last_build_index_on_table_finish(tbl, timeout) + assertEquals(state, "FINISHED") + } +}