[fix](transaction) Fix concurrent schema change and txn cause dead lock (#26428)

Concurrent schema change and txn may cause dead lock. An example:

Txn T commit but not publish;
Run schema change or rollup on T's related partition, add alter replica R;
sc/rollup add a sched txn watermark M;
Restart fe;
After fe restart, T's loadedTblIndexes will clear because it's not save to disk;
T will publish version to all tablet, including sc/rollup's new alter replica R;
Since R not contains txn data, so the T will fail. It will then always waitting for R's data;
sc/rollup wait for txn before M to finish, only after that it will let R copy history data;
Since T's not finished, so sc/rollup will always wait, so R will nerver copy history data;
Txn T and sc/rollup will wait each other forever, cause dead lock;
Fix: because sc/rollup will ensure double write after the sched watermark M, so for finish transaction, when checking a alter replica:

if txn id is bigger than M, check it just like a normal replica;
otherwise skip check this replica, the BE will modify history data later.
This commit is contained in:
yujun
2023-11-13 21:39:28 +08:00
committed by GitHub
parent 7b50a62f0c
commit ebc15fc6cc
12 changed files with 306 additions and 22 deletions

View File

@ -1074,4 +1074,26 @@ std::shared_ptr<roaring::Roaring> DeleteBitmap::get_agg(const BitmapKey& bmk) co
std::atomic<ShardedLRUCache*> DeleteBitmap::AggCache::s_repr {nullptr};
std::string tablet_state_name(TabletState state) {
switch (state) {
case TABLET_NOTREADY:
return "TABLET_NOTREADY";
case TABLET_RUNNING:
return "TABLET_RUNNING";
case TABLET_TOMBSTONED:
return "TABLET_TOMBSTONED";
case TABLET_STOPPED:
return "TABLET_STOPPED";
case TABLET_SHUTDOWN:
return "TABLET_SHUTDOWN";
default:
return "TabletState(" + std::to_string(state) + ")";
}
}
} // namespace doris

View File

@ -632,6 +632,8 @@ inline bool TabletMeta::all_beta() const {
return true;
}
std::string tablet_state_name(TabletState state);
// Only for unit test now.
bool operator==(const TabletMeta& a, const TabletMeta& b);
bool operator!=(const TabletMeta& a, const TabletMeta& b);

View File

@ -244,14 +244,17 @@ Status EnginePublishVersionTask::finish() {
add_error_tablet_id(tablet_id);
if (res.ok()) {
res = Status::Error<VERSION_NOT_EXIST>(
"tablet {} not exists version {}", tablet_id,
"tablet {} with state {} not exists version {}", tablet_id,
tablet_state_name(tablet->tablet_state()),
par_ver_info.version);
}
LOG(WARNING) << "publish version failed on transaction, tablet version not "
"exists. "
<< "transaction_id=" << transaction_id
<< ", tablet_id=" << tablet_id
<< ", version=" << par_ver_info.version;
LOG(WARNING)
<< "publish version failed on transaction, tablet version not "
"exists. "
<< "transaction_id=" << transaction_id
<< ", tablet_id=" << tablet_id
<< ", tablet_state=" << tablet_state_name(tablet->tablet_state())
<< ", version=" << par_ver_info.version;
}
}
}

View File

@ -439,6 +439,13 @@ public class Config extends ConfigBase {
+ "then the load task will be successful." })
public static int publish_wait_time_second = 300;
@ConfField(mutable = true, masterOnly = true, description = {"导入 Publish 阶段是否检查正在做 Schema 变更的副本。"
+ "正常情况下,不要关闭此检查。除非在极端情况下出现导入和 Schema 变更出现互相等待死锁时才临时打开。",
"Check the replicas which are doing schema change when publish transaction. Do not turn off this check "
+ " under normal circumstances. It's only temporarily skip check if publish version and schema change have"
+ " dead lock" })
public static boolean publish_version_check_alter_replica = true;
@ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。"
+ "该参数仅用于事务型 insert 操作中。",
"Maximal waiting time for all data inserted before one transaction to be committed, in seconds. "

View File

@ -89,6 +89,11 @@ public abstract class AlterJobV2 implements Writable {
@SerializedName(value = "rawSql")
protected String rawSql;
// The job will wait all transactions before this txn id finished, then send the schema_change/rollup tasks.
@SerializedName(value = "watershedTxnId")
protected long watershedTxnId = -1;
public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long tableId, String tableName,
long timeoutMs) {
this.rawSql = rawSql;
@ -135,6 +140,10 @@ public abstract class AlterJobV2 implements Writable {
return tableName;
}
public long getWatershedTxnId() {
return watershedTxnId;
}
public boolean isTimeout() {
return System.currentTimeMillis() - createTimeMs > timeoutMs;
}

View File

@ -134,10 +134,6 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
@SerializedName(value = "storageFormat")
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
// The rollup job will wait all transactions before this txn id finished, then send the rollup tasks.
@SerializedName(value = "watershedTxnId")
protected long watershedTxnId = -1;
// save all create rollup tasks
private AgentBatchTask rollupBatchTask = new AgentBatchTask();
// save failed task after retry three times, tabletId -> agentTask

View File

@ -126,9 +126,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
@SerializedName(value = "indexes")
private List<Index> indexes = null;
// The schema change job will wait all transactions before this txn id finished, then send the schema change tasks.
@SerializedName(value = "watershedTxnId")
protected long watershedTxnId = -1;
@SerializedName(value = "storageFormat")
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;

View File

@ -17,6 +17,7 @@
package org.apache.doris.transaction;
import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@ -535,6 +536,7 @@ public class DatabaseTransactionMgr {
transactionState.prolongPublishTimeout();
}
// (TODO): ignore the alter index if txn id is less than sc sched watermark
int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partition.getId());
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
@ -553,6 +555,7 @@ public class DatabaseTransactionMgr {
throw new TransactionCommitFailedException("could not find replica for tablet ["
+ tabletId + "], backend [" + tabletBackend + "]");
}
// if the tablet have no replica's to commit or the tablet is a rolling up tablet,
// the commit backends maybe null
// if the commit backends is null, set all replicas as error replicas
@ -985,6 +988,7 @@ public class DatabaseTransactionMgr {
continue;
}
boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionId, table);
Iterator<PartitionCommitInfo> partitionCommitInfoIterator
= tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
while (partitionCommitInfoIterator.hasNext()) {
@ -1037,7 +1041,7 @@ public class DatabaseTransactionMgr {
tabletWriteFailedReplicas.clear();
tabletVersionFailedReplicas.clear();
for (Replica replica : tablet.getReplicas()) {
checkReplicaContinuousVersionSucc(tablet.getId(), replica,
checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn,
partitionCommitInfo.getVersion(), publishTasks.get(replica.getBackendId()),
errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
tabletVersionFailedReplicas);
@ -1132,8 +1136,24 @@ public class DatabaseTransactionMgr {
LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name());
}
private void checkReplicaContinuousVersionSucc(long tabletId, Replica replica, long version,
PublishVersionTask backendPublishTask, Set<Long> errorReplicaIds, List<Replica> tabletSuccReplicas,
private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable table) {
List<AlterJobV2> unfinishedAlterJobs = null;
if (table.getState() == OlapTable.OlapTableState.SCHEMA_CHANGE) {
unfinishedAlterJobs = Env.getCurrentEnv().getAlterInstance().getSchemaChangeHandler()
.getUnfinishedAlterJobV2ByTableId(table.getId());
} else if (table.getState() == OlapTable.OlapTableState.ROLLUP) {
unfinishedAlterJobs = Env.getCurrentEnv().getAlterInstance().getMaterializedViewHandler()
.getUnfinishedAlterJobV2ByTableId(table.getId());
} else {
return true;
}
return unfinishedAlterJobs.stream().allMatch(job -> transactionId > job.getWatershedTxnId());
}
private void checkReplicaContinuousVersionSucc(long tabletId, Replica replica, boolean alterReplicaLoadedTxn,
long version, PublishVersionTask backendPublishTask,
Set<Long> errorReplicaIds, List<Replica> tabletSuccReplicas,
List<Replica> tabletWriteFailedReplicas, List<Replica> tabletVersionFailedReplicas) {
if (backendPublishTask == null || !backendPublishTask.isFinished()) {
errorReplicaIds.add(replica.getId());
@ -1155,6 +1175,17 @@ public class DatabaseTransactionMgr {
}
}
// Schema change and rollup has a sched watermark,
// it's ensure that alter replicas will load those txns whose txn id > sched watermark.
// But for txns before the sched watermark, the alter replicas maynot load the txns,
// publish will ignore checking them and treat them as success in advance.
// Later be will fill the alter replicas's history data which before sched watermark.
// If failed to fill, fe will set the alter replica bad.
if (replica.getState() == Replica.ReplicaState.ALTER
&& (!alterReplicaLoadedTxn || !Config.publish_version_check_alter_replica)) {
errorReplicaIds.remove(replica.getId());
}
if (!errorReplicaIds.contains(replica.getId())) {
if (replica.checkVersionCatchUp(version - 1, true)) {
tabletSuccReplicas.add(replica);

View File

@ -0,0 +1,61 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_1_1 --
1 10
2 20
-- !select_2_1 --
1 10
2 20
-- !select_3_1 --
1 11 111
2 22 222
-- !select_4_1 --
1 10
2 20
-- !select_1_2 --
1 10
2 20
-- !select_2_2 --
1 10
2 20
-- !select_3_2 --
1 11 111
2 22 222
-- !select_4_2 --
1 10
2 20
-- !select_1_3 --
1 10 -1
2 20 -1
3 30 -1
4 40 -1
5 50 -1
-- !select_2_3 --
1 10
2 20
3 30
4 40
5 50
-- !select_3_3 --
1 11 111
2 22 222
3 33 333
4 44 444
5 55 555
-- !select_4_3 --
10 1
20 2
30 3
40 4
50 5

View File

@ -168,12 +168,7 @@ class Suite implements GroovyInterceptable {
try {
Thread.currentThread().setName(threadName == null ? originThreadName : threadName)
if (connInfo != null) {
def newConnInfo = new ConnectionInfo()
newConnInfo.conn = DriverManager.getConnection(connInfo.conn.getMetaData().getURL(),
connInfo.username, connInfo.password)
newConnInfo.username = connInfo.username
newConnInfo.password = connInfo.password
context.threadLocalConn.set(newConnInfo)
context.connectTo(connInfo.conn.getMetaData().getURL(), connInfo.username, connInfo.password);
}
context.scriptContext.eventListeners.each { it.onThreadStarted(context) }

View File

@ -250,6 +250,32 @@ class SuiteContext implements Closeable {
}
}
public void reconnectFe() {
ConnectionInfo connInfo = threadLocalConn.get()
if (connInfo == null) {
return
}
connectTo(connInfo.conn.getMetaData().getURL(), connInfo.username, connInfo.password);
}
public void connectTo(String url, String username, String password) {
ConnectionInfo oldConn = threadLocalConn.get()
if (oldConn != null) {
threadLocalConn.remove()
try {
oldConn.conn.close()
} catch (Throwable t) {
log.warn("Close connection failed", t)
}
}
def newConnInfo = new ConnectionInfo()
newConnInfo.conn = DriverManager.getConnection(url, username, password)
newConnInfo.username = username
newConnInfo.password = password
threadLocalConn.set(newConnInfo)
}
OutputUtils.OutputBlocksIterator getOutputIterator() {
def outputIt = threadLocalOutputIterator.get()
if (outputIt == null) {

View File

@ -0,0 +1,135 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.NodeType
suite('test_schema_change_concurrent_with_txn') {
def options = new ClusterOptions()
options.enableDebugPoints()
options.feConfigs.add('publish_wait_time_second=-1')
docker(options) {
sql 'SET GLOBAL insert_visible_timeout_ms = 2000'
def result = sql 'SELECT DATABASE()'
def dbName = result[0][0]
sql 'CREATE TABLE tbl_1 (k1 INT, k2 INT) PROPERTIES ( "light_schema_change" = "false")'
sql 'INSERT INTO tbl_1 VALUES (1, 10)'
sql 'INSERT INTO tbl_1 VALUES (2, 20)'
order_qt_select_1_1 'SELECT * FROM tbl_1'
sql 'CREATE TABLE tbl_2 AS SELECT * FROM tbl_1'
order_qt_select_2_1 'SELECT * FROM tbl_2'
sql 'CREATE TABLE tbl_3 (k1 INT, k2 INT, v INT SUM) AGGREGATE KEY (k1, k2)'
sql 'INSERT INTO tbl_3 VALUES (1, 11, 111)'
sql 'INSERT INTO tbl_3 VALUES (2, 22, 222)'
order_qt_select_3_1 'SELECT * FROM tbl_3'
sql 'CREATE TABLE tbl_4 (k1 INT, k2 INT)'
sql 'INSERT INTO tbl_4 VALUES (1, 10)'
sql 'INSERT INTO tbl_4 VALUES (2, 20)'
order_qt_select_4_1 'SELECT * FROM tbl_4'
// stop publish, insert succ, txn is commit but not visible
cluster.injectDebugPoints(NodeType.FE, ['PublishVersionDaemon.stop_publish':null])
sql 'INSERT INTO tbl_1 VALUES (3, 30)'
sql 'INSERT INTO tbl_1 VALUES (4, 40)'
order_qt_select_1_2 'SELECT * FROM tbl_1'
sql 'INSERT INTO tbl_2 VALUES (3, 30)'
sql 'INSERT INTO tbl_2 VALUES (4, 40)'
order_qt_select_2_2 'SELECT * FROM tbl_2'
sql 'INSERT INTO tbl_3 VALUES (3, 33, 333)'
sql 'INSERT INTO tbl_3 VALUES (4, 44, 444)'
order_qt_select_3_2 'SELECT * FROM tbl_3'
sql 'INSERT INTO tbl_4 VALUES (3, 30)'
sql 'INSERT INTO tbl_4 VALUES (4, 40)'
order_qt_select_4_2 'SELECT * FROM tbl_4'
result = sql 'SHOW PROC "/transactions"'
def runningTxn = result.find({ it[1].indexOf(dbName) > 0 })[2]
assertEquals(8, runningTxn as int)
sql "ALTER TABLE tbl_1 ADD COLUMN k3 INT DEFAULT '-1'"
sql 'CREATE MATERIALIZED VIEW tbl_2_mv AS SELECT k1, k1 + k2 FROM tbl_2'
sql 'ALTER TABLE tbl_3 ADD ROLLUP tbl_3_r1(k1, v)'
sql 'ALTER TABLE tbl_4 ORDER BY (k2, k1)'
sleep(5000)
def jobs = null
def scJobState = job -> job[9]
def rollupJobState = job -> job[8]
jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_1'"
assertEquals(1, jobs.size())
assertEquals('WAITING_TXN', scJobState(jobs[0]))
jobs = sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName = 'tbl_2'"
assertEquals(1, jobs.size())
assertEquals('WAITING_TXN', rollupJobState(jobs[0]))
jobs = sql "SHOW ALTER TABLE ROLLUP WHERE TableName = 'tbl_3'"
assertEquals(1, jobs.size())
assertEquals('WAITING_TXN', rollupJobState(jobs[0]))
jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_4'"
assertEquals(1, jobs.size())
assertEquals('WAITING_TXN', scJobState(jobs[0]))
sql 'INSERT INTO tbl_1(k1, k2) VALUES (5, 50)'
sql 'INSERT INTO tbl_2 VALUES (5, 50)'
sql 'INSERT INTO tbl_3 VALUES (5, 55, 555)'
sql 'INSERT INTO tbl_4(k1, k2) VALUES (5, 50)'
// After fe restart, transaction's loadedTblIndexes will clear,
// then fe will send publish task to all indexes.
// But the alter index may add after commit txn, then publish will failed.
cluster.restartFrontends()
sleep(30000)
context.reconnectFe()
//cluster.clearFrontendDebugPoints()
// should publish visible
order_qt_select_1_3 'SELECT * FROM tbl_1'
order_qt_select_2_3 'SELECT * FROM tbl_2'
order_qt_select_3_3 'SELECT * FROM tbl_3'
order_qt_select_4_3 'SELECT * FROM tbl_4'
jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_1'"
assertEquals(1, jobs.size())
assertEquals('FINISHED', scJobState(jobs[0]))
jobs = sql "SHOW ALTER TABLE MATERIALIZED VIEW WHERE TableName = 'tbl_2'"
assertEquals(1, jobs.size())
assertEquals('FINISHED', rollupJobState(jobs[0]))
jobs = sql "SHOW ALTER TABLE ROLLUP WHERE TableName = 'tbl_3'"
assertEquals(1, jobs.size())
assertEquals('FINISHED', rollupJobState(jobs[0]))
jobs = sql "SHOW ALTER TABLE COLUMN WHERE TableName = 'tbl_4'"
assertEquals(1, jobs.size())
assertEquals('FINISHED', scJobState(jobs[0]))
}
}