From ebc15fc6ccf012d3bd7765fce33518b85adfcf30 Mon Sep 17 00:00:00 2001 From: yujun Date: Mon, 13 Nov 2023 21:39:28 +0800 Subject: [PATCH] [fix](transaction) Fix concurrent schema change and txn cause dead lock (#26428) Concurrent schema change and txn may cause dead lock. An example: Txn T commit but not publish; Run schema change or rollup on T's related partition, add alter replica R; sc/rollup add a sched txn watermark M; Restart fe; After fe restart, T's loadedTblIndexes will clear because it's not save to disk; T will publish version to all tablet, including sc/rollup's new alter replica R; Since R not contains txn data, so the T will fail. It will then always waitting for R's data; sc/rollup wait for txn before M to finish, only after that it will let R copy history data; Since T's not finished, so sc/rollup will always wait, so R will nerver copy history data; Txn T and sc/rollup will wait each other forever, cause dead lock; Fix: because sc/rollup will ensure double write after the sched watermark M, so for finish transaction, when checking a alter replica: if txn id is bigger than M, check it just like a normal replica; otherwise skip check this replica, the BE will modify history data later. --- be/src/olap/tablet_meta.cpp | 22 +++ be/src/olap/tablet_meta.h | 2 + .../olap/task/engine_publish_version_task.cpp | 15 +- .../java/org/apache/doris/common/Config.java | 7 + .../org/apache/doris/alter/AlterJobV2.java | 9 ++ .../org/apache/doris/alter/RollupJobV2.java | 4 - .../apache/doris/alter/SchemaChangeJobV2.java | 3 - .../transaction/DatabaseTransactionMgr.java | 37 ++++- ...test_schema_change_concurrent_with_txn.out | 61 ++++++++ .../doris/regression/suite/Suite.groovy | 7 +- .../regression/suite/SuiteContext.groovy | 26 ++++ ...t_schema_change_concurrent_with_txn.groovy | 135 ++++++++++++++++++ 12 files changed, 306 insertions(+), 22 deletions(-) create mode 100644 regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out create mode 100644 regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 84160c411e..76cf14a75b 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -1074,4 +1074,26 @@ std::shared_ptr DeleteBitmap::get_agg(const BitmapKey& bmk) co std::atomic DeleteBitmap::AggCache::s_repr {nullptr}; +std::string tablet_state_name(TabletState state) { + switch (state) { + case TABLET_NOTREADY: + return "TABLET_NOTREADY"; + + case TABLET_RUNNING: + return "TABLET_RUNNING"; + + case TABLET_TOMBSTONED: + return "TABLET_TOMBSTONED"; + + case TABLET_STOPPED: + return "TABLET_STOPPED"; + + case TABLET_SHUTDOWN: + return "TABLET_SHUTDOWN"; + + default: + return "TabletState(" + std::to_string(state) + ")"; + } +} + } // namespace doris diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 622d5c44f9..60cc485882 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -632,6 +632,8 @@ inline bool TabletMeta::all_beta() const { return true; } +std::string tablet_state_name(TabletState state); + // Only for unit test now. bool operator==(const TabletMeta& a, const TabletMeta& b); bool operator!=(const TabletMeta& a, const TabletMeta& b); diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp index d914816e0a..040b5a7ede 100644 --- a/be/src/olap/task/engine_publish_version_task.cpp +++ b/be/src/olap/task/engine_publish_version_task.cpp @@ -244,14 +244,17 @@ Status EnginePublishVersionTask::finish() { add_error_tablet_id(tablet_id); if (res.ok()) { res = Status::Error( - "tablet {} not exists version {}", tablet_id, + "tablet {} with state {} not exists version {}", tablet_id, + tablet_state_name(tablet->tablet_state()), par_ver_info.version); } - LOG(WARNING) << "publish version failed on transaction, tablet version not " - "exists. " - << "transaction_id=" << transaction_id - << ", tablet_id=" << tablet_id - << ", version=" << par_ver_info.version; + LOG(WARNING) + << "publish version failed on transaction, tablet version not " + "exists. " + << "transaction_id=" << transaction_id + << ", tablet_id=" << tablet_id + << ", tablet_state=" << tablet_state_name(tablet->tablet_state()) + << ", version=" << par_ver_info.version; } } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 50c31755a0..2b8e341403 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -439,6 +439,13 @@ public class Config extends ConfigBase { + "then the load task will be successful." }) public static int publish_wait_time_second = 300; + @ConfField(mutable = true, masterOnly = true, description = {"导入 Publish 阶段是否检查正在做 Schema 变更的副本。" + + "正常情况下,不要关闭此检查。除非在极端情况下出现导入和 Schema 变更出现互相等待死锁时才临时打开。", + "Check the replicas which are doing schema change when publish transaction. Do not turn off this check " + + " under normal circumstances. It's only temporarily skip check if publish version and schema change have" + + " dead lock" }) + public static boolean publish_version_check_alter_replica = true; + @ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。" + "该参数仅用于事务型 insert 操作中。", "Maximal waiting time for all data inserted before one transaction to be committed, in seconds. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java index fb616fe429..c1984d31d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterJobV2.java @@ -89,6 +89,11 @@ public abstract class AlterJobV2 implements Writable { @SerializedName(value = "rawSql") protected String rawSql; + // The job will wait all transactions before this txn id finished, then send the schema_change/rollup tasks. + @SerializedName(value = "watershedTxnId") + protected long watershedTxnId = -1; + + public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) { this.rawSql = rawSql; @@ -135,6 +140,10 @@ public abstract class AlterJobV2 implements Writable { return tableName; } + public long getWatershedTxnId() { + return watershedTxnId; + } + public boolean isTimeout() { return System.currentTimeMillis() - createTimeMs > timeoutMs; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 6736bd3aa5..dfc825dbbc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -134,10 +134,6 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { @SerializedName(value = "storageFormat") private TStorageFormat storageFormat = TStorageFormat.DEFAULT; - // The rollup job will wait all transactions before this txn id finished, then send the rollup tasks. - @SerializedName(value = "watershedTxnId") - protected long watershedTxnId = -1; - // save all create rollup tasks private AgentBatchTask rollupBatchTask = new AgentBatchTask(); // save failed task after retry three times, tabletId -> agentTask diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 60b19e7f36..a65612989a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -126,9 +126,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { @SerializedName(value = "indexes") private List indexes = null; - // The schema change job will wait all transactions before this txn id finished, then send the schema change tasks. - @SerializedName(value = "watershedTxnId") - protected long watershedTxnId = -1; @SerializedName(value = "storageFormat") private TStorageFormat storageFormat = TStorageFormat.DEFAULT; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 42cac0c969..cac271af4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -17,6 +17,7 @@ package org.apache.doris.transaction; +import org.apache.doris.alter.AlterJobV2; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -535,6 +536,7 @@ public class DatabaseTransactionMgr { transactionState.prolongPublishTimeout(); } + // (TODO): ignore the alter index if txn id is less than sc sched watermark int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partition.getId()); for (MaterializedIndex index : allIndices) { for (Tablet tablet : index.getTablets()) { @@ -553,6 +555,7 @@ public class DatabaseTransactionMgr { throw new TransactionCommitFailedException("could not find replica for tablet [" + tabletId + "], backend [" + tabletBackend + "]"); } + // if the tablet have no replica's to commit or the tablet is a rolling up tablet, // the commit backends maybe null // if the commit backends is null, set all replicas as error replicas @@ -985,6 +988,7 @@ public class DatabaseTransactionMgr { continue; } + boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionId, table); Iterator partitionCommitInfoIterator = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator(); while (partitionCommitInfoIterator.hasNext()) { @@ -1037,7 +1041,7 @@ public class DatabaseTransactionMgr { tabletWriteFailedReplicas.clear(); tabletVersionFailedReplicas.clear(); for (Replica replica : tablet.getReplicas()) { - checkReplicaContinuousVersionSucc(tablet.getId(), replica, + checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn, partitionCommitInfo.getVersion(), publishTasks.get(replica.getBackendId()), errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas, tabletVersionFailedReplicas); @@ -1132,8 +1136,24 @@ public class DatabaseTransactionMgr { LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name()); } - private void checkReplicaContinuousVersionSucc(long tabletId, Replica replica, long version, - PublishVersionTask backendPublishTask, Set errorReplicaIds, List tabletSuccReplicas, + private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable table) { + List unfinishedAlterJobs = null; + if (table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) { + unfinishedAlterJobs = Env.getCurrentEnv().getAlterInstance().getSchemaChangeHandler() + .getUnfinishedAlterJobV2ByTableId(table.getId()); + } else if (table.getState() == OlapTable.OlapTableState.ROLLUP) { + unfinishedAlterJobs = Env.getCurrentEnv().getAlterInstance().getMaterializedViewHandler() + .getUnfinishedAlterJobV2ByTableId(table.getId()); + } else { + return true; + } + + return unfinishedAlterJobs.stream().allMatch(job -> transactionId > job.getWatershedTxnId()); + } + + private void checkReplicaContinuousVersionSucc(long tabletId, Replica replica, boolean alterReplicaLoadedTxn, + long version, PublishVersionTask backendPublishTask, + Set errorReplicaIds, List tabletSuccReplicas, List tabletWriteFailedReplicas, List tabletVersionFailedReplicas) { if (backendPublishTask == null || !backendPublishTask.isFinished()) { errorReplicaIds.add(replica.getId()); @@ -1155,6 +1175,17 @@ public class DatabaseTransactionMgr { } } + // Schema change and rollup has a sched watermark, + // it's ensure that alter replicas will load those txns whose txn id > sched watermark. + // But for txns before the sched watermark, the alter replicas maynot load the txns, + // publish will ignore checking them and treat them as success in advance. + // Later be will fill the alter replicas's history data which before sched watermark. + // If failed to fill, fe will set the alter replica bad. + if (replica.getState() == Replica.ReplicaState.ALTER + && (!alterReplicaLoadedTxn || !Config.publish_version_check_alter_replica)) { + errorReplicaIds.remove(replica.getId()); + } + if (!errorReplicaIds.contains(replica.getId())) { if (replica.checkVersionCatchUp(version - 1, true)) { tabletSuccReplicas.add(replica); diff --git a/regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out b/regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out new file mode 100644 index 0000000000..1f531cb770 --- /dev/null +++ b/regression-test/data/schema_change/test_schema_change_concurrent_with_txn.out @@ -0,0 +1,61 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1_1 -- +1 10 +2 20 + +-- !select_2_1 -- +1 10 +2 20 + +-- !select_3_1 -- +1 11 111 +2 22 222 + +-- !select_4_1 -- +1 10 +2 20 + +-- !select_1_2 -- +1 10 +2 20 + +-- !select_2_2 -- +1 10 +2 20 + +-- !select_3_2 -- +1 11 111 +2 22 222 + +-- !select_4_2 -- +1 10 +2 20 + +-- !select_1_3 -- +1 10 -1 +2 20 -1 +3 30 -1 +4 40 -1 +5 50 -1 + +-- !select_2_3 -- +1 10 +2 20 +3 30 +4 40 +5 50 + +-- !select_3_3 -- +1 11 111 +2 22 222 +3 33 333 +4 44 444 +5 55 555 + +-- !select_4_3 -- +10 1 +20 2 +30 3 +40 4 +50 5 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 0c02028c3e..c9e8fe4215 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -168,12 +168,7 @@ class Suite implements GroovyInterceptable { try { Thread.currentThread().setName(threadName == null ? originThreadName : threadName) if (connInfo != null) { - def newConnInfo = new ConnectionInfo() - newConnInfo.conn = DriverManager.getConnection(connInfo.conn.getMetaData().getURL(), - connInfo.username, connInfo.password) - newConnInfo.username = connInfo.username - newConnInfo.password = connInfo.password - context.threadLocalConn.set(newConnInfo) + context.connectTo(connInfo.conn.getMetaData().getURL(), connInfo.username, connInfo.password); } context.scriptContext.eventListeners.each { it.onThreadStarted(context) } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy index d31364eb0e..31eed47839 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/SuiteContext.groovy @@ -250,6 +250,32 @@ class SuiteContext implements Closeable { } } + public void reconnectFe() { + ConnectionInfo connInfo = threadLocalConn.get() + if (connInfo == null) { + return + } + connectTo(connInfo.conn.getMetaData().getURL(), connInfo.username, connInfo.password); + } + + public void connectTo(String url, String username, String password) { + ConnectionInfo oldConn = threadLocalConn.get() + if (oldConn != null) { + threadLocalConn.remove() + try { + oldConn.conn.close() + } catch (Throwable t) { + log.warn("Close connection failed", t) + } + } + + def newConnInfo = new ConnectionInfo() + newConnInfo.conn = DriverManager.getConnection(url, username, password) + newConnInfo.username = username + newConnInfo.password = password + threadLocalConn.set(newConnInfo) + } + OutputUtils.OutputBlocksIterator getOutputIterator() { def outputIt = threadLocalOutputIterator.get() if (outputIt == null) { diff --git a/regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy b/regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy new file mode 100644 index 0000000000..4e2f717ed4 --- /dev/null +++ b/regression-test/suites/schema_change/test_schema_change_concurrent_with_txn.groovy @@ -0,0 +1,135 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType + +suite('test_schema_change_concurrent_with_txn') { + def options = new ClusterOptions() + options.enableDebugPoints() + options.feConfigs.add('publish_wait_time_second=-1') + docker(options) { + sql 'SET GLOBAL insert_visible_timeout_ms = 2000' + + def result = sql 'SELECT DATABASE()' + def dbName = result[0][0] + + sql 'CREATE TABLE tbl_1 (k1 INT, k2 INT) PROPERTIES ( "light_schema_change" = "false")' + sql 'INSERT INTO tbl_1 VALUES (1, 10)' + sql 'INSERT INTO tbl_1 VALUES (2, 20)' + order_qt_select_1_1 'SELECT * FROM tbl_1' + + sql 'CREATE TABLE tbl_2 AS SELECT * FROM tbl_1' + order_qt_select_2_1 'SELECT * FROM tbl_2' + + sql 'CREATE TABLE tbl_3 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, k2)' + sql 'INSERT INTO tbl_3 VALUES (1, 11, 111)' + sql 'INSERT INTO tbl_3 VALUES (2, 22, 222)' + order_qt_select_3_1 'SELECT * FROM tbl_3' + + sql 'CREATE TABLE tbl_4 (k1 INT, k2 INT)' + sql 'INSERT INTO tbl_4 VALUES (1, 10)' + sql 'INSERT INTO tbl_4 VALUES (2, 20)' + order_qt_select_4_1 'SELECT * FROM tbl_4' + + // stop publish, insert succ, txn is commit but not visible + cluster.injectDebugPoints(NodeType.FE, ['PublishVersionDaemon.stop_publish':null]) + + sql 'INSERT INTO tbl_1 VALUES (3, 30)' + sql 'INSERT INTO tbl_1 VALUES (4, 40)' + order_qt_select_1_2 'SELECT * FROM tbl_1' + + sql 'INSERT INTO tbl_2 VALUES (3, 30)' + sql 'INSERT INTO tbl_2 VALUES (4, 40)' + order_qt_select_2_2 'SELECT * FROM tbl_2' + + sql 'INSERT INTO tbl_3 VALUES (3, 33, 333)' + sql 'INSERT INTO tbl_3 VALUES (4, 44, 444)' + order_qt_select_3_2 'SELECT * FROM tbl_3' + + sql 'INSERT INTO tbl_4 VALUES (3, 30)' + sql 'INSERT INTO tbl_4 VALUES (4, 40)' + order_qt_select_4_2 'SELECT * FROM tbl_4' + + result = sql 'SHOW PROC "/transactions"' + def runningTxn = result.find({ it[1].indexOf(dbName) > 0 })[2] + assertEquals(8, runningTxn as int) + + sql "ALTER TABLE tbl_1 ADD COLUMN k3 INT DEFAULT '-1'" + sql 'CREATE MATERIALIZED VIEW tbl_2_mv AS SELECT k1, k1 + k2 FROM tbl_2' + sql 'ALTER TABLE tbl_3 ADD ROLLUP tbl_3_r1(k1, v)' + sql 'ALTER TABLE tbl_4 ORDER BY (k2, k1)' + + sleep(5000) + + def jobs = null + def scJobState = job -> job[9] + def rollupJobState = job -> job[8] + + jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_1'" + assertEquals(1, jobs.size()) + assertEquals('WAITING_TXN', scJobState(jobs[0])) + + jobs = sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName = 'tbl_2'" + assertEquals(1, jobs.size()) + assertEquals('WAITING_TXN', rollupJobState(jobs[0])) + + jobs = sql "SHOW ALTER TABLE ROLLUP WHERE TableName = 'tbl_3'" + assertEquals(1, jobs.size()) + assertEquals('WAITING_TXN', rollupJobState(jobs[0])) + + jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_4'" + assertEquals(1, jobs.size()) + assertEquals('WAITING_TXN', scJobState(jobs[0])) + + sql 'INSERT INTO tbl_1(k1, k2) VALUES (5, 50)' + sql 'INSERT INTO tbl_2 VALUES (5, 50)' + sql 'INSERT INTO tbl_3 VALUES (5, 55, 555)' + sql 'INSERT INTO tbl_4(k1, k2) VALUES (5, 50)' + + // After fe restart, transaction's loadedTblIndexes will clear, + // then fe will send publish task to all indexes. + // But the alter index may add after commit txn, then publish will failed. + cluster.restartFrontends() + sleep(30000) + context.reconnectFe() + + //cluster.clearFrontendDebugPoints() + + // should publish visible + order_qt_select_1_3 'SELECT * FROM tbl_1' + order_qt_select_2_3 'SELECT * FROM tbl_2' + order_qt_select_3_3 'SELECT * FROM tbl_3' + order_qt_select_4_3 'SELECT * FROM tbl_4' + + jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_1'" + assertEquals(1, jobs.size()) + assertEquals('FINISHED', scJobState(jobs[0])) + + jobs = sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName = 'tbl_2'" + assertEquals(1, jobs.size()) + assertEquals('FINISHED', rollupJobState(jobs[0])) + + jobs = sql "SHOW ALTER TABLE ROLLUP WHERE TableName = 'tbl_3'" + assertEquals(1, jobs.size()) + assertEquals('FINISHED', rollupJobState(jobs[0])) + + jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_4'" + assertEquals(1, jobs.size()) + assertEquals('FINISHED', scJobState(jobs[0])) + } +}