From 98fccb18094a2de934ad2fee5e1543eb13954dd3 Mon Sep 17 00:00:00 2001 From: qiye Date: Sun, 16 Jun 2024 09:34:32 +0800 Subject: [PATCH] [improvement](build index)Make build index and clone mutually exclusive and add timeout for index change job (#36293) Currently the index change job and clone task can be executed at the same time. If the clone task gets stuck at this point, it will cause the index change job to get stuck as well and keep retrying. To solve this problem, we can refer to alter job and make index change job exclusive with clone task, and introduce the timeout to prevent infinite retries of build index. Add the following checks and status in FE. 1. Check if table is stable (build index is not allowed when clone is in progress) 1.1. Tablet is HEALTHY. 1.2. Whether the tablet is included in the Tablet scheduler, if so, it means the current tablet is doing clone. 2. When creating the index change job, set the timeout at the same time. pick from master #35724 --- .../apache/doris/alter/IndexChangeJob.java | 76 ++++++++++++++- .../doris/alter/SchemaChangeHandler.java | 3 +- .../test_build_index_with_clone_fault.groovy | 93 ++++++++++++++++++ ...st_build_index_with_clone_by_docker.groovy | 94 +++++++++++++++++++ 4 files changed, 263 insertions(+), 3 deletions(-) create mode 100644 regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy create mode 100644 regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java index e6f4c4e0a0..6f2b358e16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/IndexChangeJob.java @@ -37,6 +37,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; +import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterInvertedIndexTask; @@ -46,6 +47,7 @@ import org.apache.doris.thrift.TTaskType; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -54,6 +56,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; +import java.util.Map; public class IndexChangeJob implements Writable { @@ -106,6 +109,11 @@ public class IndexChangeJob implements Writable { private long originIndexId; @SerializedName(value = "invertedIndexBatchTask") AgentBatchTask invertedIndexBatchTask = new AgentBatchTask(); + // save failed task after retry three times, tablet -> backends + @SerializedName(value = "failedTabletBackends") + protected Map> failedTabletBackends = Maps.newHashMap(); + @SerializedName(value = "timeoutMs") + protected long timeoutMs = -1; public IndexChangeJob() { this.jobId = -1; @@ -117,7 +125,7 @@ public class IndexChangeJob implements Writable { this.jobState = JobState.WAITING_TXN; } - public IndexChangeJob(long jobId, long dbId, long tableId, String tableName) { + public IndexChangeJob(long jobId, long dbId, long tableId, String tableName, long timeoutMs) { this.jobId = jobId; this.dbId = dbId; this.tableId = tableId; @@ -127,6 +135,7 @@ public class IndexChangeJob implements Writable { this.jobState = JobState.WAITING_TXN; this.watershedTxnId = Env.getCurrentGlobalTransactionMgr() .getTransactionIDGenerator().getNextTransactionId(); + this.timeoutMs = timeoutMs; } public long getJobId() { @@ -207,6 +216,10 @@ public class IndexChangeJob implements Writable { this.finishedTimeMs = finishedTimeMs; } + public boolean isTimeout() { + return System.currentTimeMillis() - createTimeMs > timeoutMs; + } + /** * The keyword 'synchronized' only protects 2 methods: * run() and cancel() @@ -218,6 +231,10 @@ public class IndexChangeJob implements Writable { * db lock */ public synchronized void run() { + if (isTimeout()) { + cancelImpl("Timeout"); + return; + } try { switch (jobState) { case WAITING_TXN: @@ -238,6 +255,31 @@ public class IndexChangeJob implements Writable { return cancelImpl(errMsg); } + /** + * should be called before executing the job. + * return false if table is not stable. + */ + protected boolean checkTableStable(OlapTable tbl) throws AlterCancelException { + tbl.writeLockOrAlterCancelException(); + try { + boolean isStable = tbl.isStable(Env.getCurrentSystemInfo(), + Env.getCurrentEnv().getTabletScheduler()); + + if (!isStable) { + errMsg = "table is unstable"; + LOG.warn("wait table {} to be stable before doing index change job", tableId); + return false; + } else { + // table is stable + LOG.info("table {} is stable, start index change job {}", tableId, jobId); + errMsg = ""; + return true; + } + } finally { + tbl.writeUnlock(); + } + } + // Check whether transactions of the given database which txnId is less than 'watershedTxnId' are finished. protected boolean isPreviousLoadFinished() throws AnalysisException { return Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( @@ -266,6 +308,10 @@ public class IndexChangeJob implements Writable { throw new AlterCancelException(e.getMessage()); } + if (!checkTableStable(olapTable)) { + return; + } + olapTable.readLock(); try { List originSchemaColumns = olapTable.getSchemaByIndexId(originIndexId, true); @@ -308,10 +354,36 @@ public class IndexChangeJob implements Writable { protected void runRunningJob() throws AlterCancelException { Preconditions.checkState(jobState == JobState.RUNNING, jobState); + // must check if db or table still exist first. + // or if table is dropped, the tasks will never be finished, + // and the job will be in RUNNING state forever. + Database db = Env.getCurrentInternalCatalog() + .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist")); + OlapTable tbl; + try { + tbl = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP); + } catch (MetaNotFoundException e) { + throw new AlterCancelException(e.getMessage()); + } if (!invertedIndexBatchTask.isFinished()) { LOG.info("inverted index tasks not finished. job: {}, partitionId: {}", jobId, partitionId); - // TODO: task failed limit + List tasks = invertedIndexBatchTask.getUnfinishedTasks(2000); + for (AgentTask task : tasks) { + if (task.getFailedTimes() > 3) { + LOG.warn("alter inverted index task failed: " + task.getErrorMsg()); + List failedBackends = failedTabletBackends.computeIfAbsent(task.getTabletId(), + k -> Lists.newArrayList()); + failedBackends.add(task.getBackendId()); + int expectSucceedTaskNum = tbl.getPartitionInfo() + .getReplicaAllocation(task.getPartitionId()).getTotalReplicaNum(); + int failedTaskCount = failedBackends.size(); + if (expectSucceedTaskNum - failedTaskCount < expectSucceedTaskNum / 2 + 1) { + throw new AlterCancelException("inverted index tasks failed on same tablet reach threshold " + + failedTaskCount); + } + } + } return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 602b7600c9..a3572caad3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2944,13 +2944,14 @@ public class SchemaChangeHandler extends AlterHandler { throw new DdlException("Nothing is changed. please check your alter stmt."); } + long timeoutSecond = Config.alter_table_timeout_second; for (Map.Entry> entry : changedIndexIdToSchema.entrySet()) { long originIndexId = entry.getKey(); for (Partition partition : olapTable.getPartitions()) { // create job long jobId = Env.getCurrentEnv().getNextId(); IndexChangeJob indexChangeJob = new IndexChangeJob( - jobId, db.getId(), olapTable.getId(), olapTable.getName()); + jobId, db.getId(), olapTable.getId(), olapTable.getName(), timeoutSecond * 1000); indexChangeJob.setOriginIndexId(originIndexId); indexChangeJob.setAlterInvertedIndexInfo(isDropOp, alterIndexes); long partitionId = partition.getId(); diff --git a/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy new file mode 100644 index 0000000000..bfbf5a4896 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_build_index_with_clone_fault.groovy @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_build_index_with_clone_fault_injection", "nonConcurrent"){ + def backends = sql_return_maparray('show backends') + // if backens is less than 2, skip this case + if (backends.size() < 2) { + return + } + def timeout = 300000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() == 0) { + logger.info(table_name + " last index job finished") + return "SKIPPED" + } + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + sleep(10000) // wait change table state to normal + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + } + useTime = t + sleep(delta_time) + } + logger.info("wait_for_last_build_index_on_table_finish debug: " + alter_res) + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def tbl = 'test_build_index_with_clone' + try { + GetDebugPoint().enableDebugPointForAllBEs("EngineCloneTask.wait_clone") + logger.info("add debug point EngineCloneTask.wait_clone") + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ + CREATE TABLE ${tbl} ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + for (def i = 1; i <= 5; i++) { + sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})" + } + + sql """ sync """ + + // get tablets and set replica status to DROP + def tablet = sql_return_maparray("show tablets from ${tbl}")[0] + sql """ + ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop"); + """ + // create index on table + sql """ create index idx_k2 on ${tbl}(k2) using inverted """ + sql """ build index idx_k2 on ${tbl} """ + // sleep 5s to wait for the build index job report table is unstable + sleep(5000) + def show_build_index = sql_return_maparray("show build index where TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1") + assertEquals('WAITING_TXN', show_build_index[0].State) + assertEquals('table is unstable', show_build_index[0].Msg) + + def state = wait_for_last_build_index_on_table_finish(tbl, timeout) + assertEquals(state, "FINISHED") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("EngineCloneTask.wait_clone") + } +} diff --git a/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy new file mode 100644 index 0000000000..9d30ca30c0 --- /dev/null +++ b/regression-test/suites/inverted_index_p0/test_build_index_with_clone_by_docker.groovy @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import org.apache.doris.regression.suite.SuiteCluster + +suite("test_build_index_with_clone_by_docker"){ + def timeout = 300000 + def delta_time = 1000 + def alter_res = "null" + def useTime = 0 + + def wait_for_last_build_index_on_table_finish = { table_name, OpTimeout -> + for(int t = delta_time; t <= OpTimeout; t += delta_time){ + alter_res = sql """SHOW BUILD INDEX WHERE TableName = "${table_name}" ORDER BY JobId """ + + if (alter_res.size() == 0) { + logger.info(table_name + " last index job finished") + return "SKIPPED" + } + if (alter_res.size() > 0) { + def last_job_state = alter_res[alter_res.size()-1][7]; + if (last_job_state == "FINISHED" || last_job_state == "CANCELLED") { + sleep(10000) // wait change table state to normal + logger.info(table_name + " last index job finished, state: " + last_job_state + ", detail: " + alter_res) + return last_job_state; + } + } + useTime = t + sleep(delta_time) + } + logger.info("wait_for_last_build_index_on_table_finish debug: " + alter_res) + assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_finish timeout") + return "wait_timeout" + } + + def options = new ClusterOptions() + options.enableDebugPoints() + options.setFeNum(1) + options.setBeNum(3) + options.cloudMode = false + def tbl = 'test_build_index_with_clone_by_docker' + docker(options) { + cluster.injectDebugPoints(NodeType.BE, ['EngineCloneTask.wait_clone' : null]) + sql """ DROP TABLE IF EXISTS ${tbl} """ + sql """ + CREATE TABLE ${tbl} ( + `k1` int(11) NULL, + `k2` int(11) NULL + ) + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ("replication_num" = "1") + """ + for (def i = 1; i <= 5; i++) { + sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})" + } + + sql """ sync """ + + // get tablets and set replica status to DROP + def tablet = sql_return_maparray("show tablets from ${tbl}")[0] + sql """ + ADMIN SET REPLICA STATUS PROPERTIES("tablet_id" = "${tablet.TabletId}", "backend_id" = "${tablet.BackendId}", "status" = "drop"); + """ + // create index on table + sql """ create index idx_k2 on ${tbl}(k2) using inverted """ + sql """ build index idx_k2 on ${tbl} """ + // sleep 5s to wait for the build index job report table is unstable + sleep(5000) + def show_build_index = sql_return_maparray("show build index where TableName = \"${tbl}\" ORDER BY JobId DESC LIMIT 1") + assertEquals('WAITING_TXN', show_build_index[0].State) + assertEquals('table is unstable', show_build_index[0].Msg) + + def state = wait_for_last_build_index_on_table_finish(tbl, timeout) + assertEquals(state, "FINISHED") + } +}