[fix](transaction) Fix publish txn wait too long when not meet quorum (#26659)
This commit is contained in:
@ -294,7 +294,7 @@ public class DatabaseTransactionMgr {
|
||||
info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
|
||||
info.add(TimeUtils.longToTimeString(txnState.getPreCommitTime()));
|
||||
info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
|
||||
info.add(TimeUtils.longToTimeString(txnState.getPublishVersionTime()));
|
||||
info.add(TimeUtils.longToTimeString(txnState.getLastPublishVersionTime()));
|
||||
info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
|
||||
info.add(txnState.getReason());
|
||||
info.add(String.valueOf(txnState.getErrorReplicas().size()));
|
||||
@ -944,10 +944,10 @@ public class DatabaseTransactionMgr {
|
||||
Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks();
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
long firstPublishOneSuccTime = transactionState.getFirstPublishOneSuccTime();
|
||||
long firstPublishVersionTime = transactionState.getFirstPublishVersionTime();
|
||||
boolean allowPublishOneSucc = false;
|
||||
if (Config.publish_wait_time_second > 0 && firstPublishOneSuccTime > 0
|
||||
&& now >= firstPublishOneSuccTime + Config.publish_wait_time_second * 1000L) {
|
||||
if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
|
||||
&& now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
|
||||
allowPublishOneSucc = true;
|
||||
}
|
||||
|
||||
@ -970,7 +970,6 @@ public class DatabaseTransactionMgr {
|
||||
tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
|
||||
PublishResult publishResult = PublishResult.QUORUM_SUCC;
|
||||
try {
|
||||
boolean allTabletsLeastOneSucc = true;
|
||||
Iterator<TableCommitInfo> tableCommitInfoIterator
|
||||
= transactionState.getIdToTableCommitInfos().values().iterator();
|
||||
while (tableCommitInfoIterator.hasNext()) {
|
||||
@ -1058,10 +1057,6 @@ public class DatabaseTransactionMgr {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (healthReplicaNum == 0) {
|
||||
allTabletsLeastOneSucc = false;
|
||||
}
|
||||
|
||||
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
|
||||
tabletVersionFailedReplicas);
|
||||
if (allowPublishOneSucc && healthReplicaNum > 0) {
|
||||
@ -1100,9 +1095,6 @@ public class DatabaseTransactionMgr {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (allTabletsLeastOneSucc && firstPublishOneSuccTime <= 0) {
|
||||
transactionState.setFirstPublishOneSuccTime(now);
|
||||
}
|
||||
if (publishResult == PublishResult.FAILED) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -121,7 +121,7 @@ public class PublishVersionDaemon extends MasterDaemon {
|
||||
batchTask.addTask(task);
|
||||
transactionState.addPublishVersionTask(backendId, task);
|
||||
}
|
||||
transactionState.setHasSendTask(true);
|
||||
transactionState.setSendedTask();
|
||||
LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(),
|
||||
transactionState.getDbId());
|
||||
}
|
||||
@ -174,7 +174,7 @@ public class PublishVersionDaemon extends MasterDaemon {
|
||||
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
|
||||
}
|
||||
if (MetricRepo.isInit) {
|
||||
long publishTime = transactionState.getPublishVersionTime() - transactionState.getCommitTime();
|
||||
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
|
||||
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,13 +220,14 @@ public class TransactionState implements Writable {
|
||||
// this state need not be serialized
|
||||
private Map<Long, PublishVersionTask> publishVersionTasks;
|
||||
private boolean hasSendTask;
|
||||
private long publishVersionTime = -1;
|
||||
private TransactionStatus preStatus = null;
|
||||
|
||||
// When publish txn, if every tablet has at least 1 replica published succ, but not quorum replicas succ,
|
||||
// and time since firstPublishOneSuccTime has exceeds Config.publish_wait_time_second,
|
||||
// and time since firstPublishVersionTime has exceeds Config.publish_wait_time_second,
|
||||
// then this transaction will become visible.
|
||||
private long firstPublishOneSuccTime = -1;
|
||||
private long firstPublishVersionTime = -1;
|
||||
|
||||
private long lastPublishVersionTime = -1;
|
||||
|
||||
@SerializedName(value = "callbackId")
|
||||
private long callbackId = -1;
|
||||
@ -339,17 +340,24 @@ public class TransactionState implements Writable {
|
||||
this.publishVersionTasks.put(backendId, task);
|
||||
}
|
||||
|
||||
public void setHasSendTask(boolean hasSendTask) {
|
||||
this.hasSendTask = hasSendTask;
|
||||
this.publishVersionTime = System.currentTimeMillis();
|
||||
public void setSendedTask() {
|
||||
this.hasSendTask = true;
|
||||
updateSendTaskTime();
|
||||
}
|
||||
|
||||
public void updateSendTaskTime() {
|
||||
this.publishVersionTime = System.currentTimeMillis();
|
||||
this.lastPublishVersionTime = System.currentTimeMillis();
|
||||
if (this.firstPublishVersionTime <= 0) {
|
||||
this.firstPublishVersionTime = lastPublishVersionTime;
|
||||
}
|
||||
}
|
||||
|
||||
public long getPublishVersionTime() {
|
||||
return this.publishVersionTime;
|
||||
public long getFirstPublishVersionTime() {
|
||||
return firstPublishVersionTime;
|
||||
}
|
||||
|
||||
public long getLastPublishVersionTime() {
|
||||
return this.lastPublishVersionTime;
|
||||
}
|
||||
|
||||
public boolean hasSendTask() {
|
||||
@ -420,14 +428,6 @@ public class TransactionState implements Writable {
|
||||
return errorLogUrl;
|
||||
}
|
||||
|
||||
public long getFirstPublishOneSuccTime() {
|
||||
return firstPublishOneSuccTime;
|
||||
}
|
||||
|
||||
public void setFirstPublishOneSuccTime(long firstPublishOneSuccTime) {
|
||||
this.firstPublishOneSuccTime = firstPublishOneSuccTime;
|
||||
}
|
||||
|
||||
public void setTransactionStatus(TransactionStatus transactionStatus) {
|
||||
// status changed
|
||||
this.preStatus = this.transactionStatus;
|
||||
@ -646,7 +646,7 @@ public class TransactionState implements Writable {
|
||||
if (prolongPublishTimeout) {
|
||||
timeoutMillis *= 2;
|
||||
}
|
||||
return System.currentTimeMillis() - publishVersionTime > timeoutMillis;
|
||||
return System.currentTimeMillis() - lastPublishVersionTime > timeoutMillis;
|
||||
}
|
||||
|
||||
public void prolongPublishTimeout() {
|
||||
|
||||
10
regression-test/data/load/insert/test_publish_one_succ.out
Normal file
10
regression-test/data/load/insert/test_publish_one_succ.out
Normal file
@ -0,0 +1,10 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !select_1 --
|
||||
|
||||
-- !select_2 --
|
||||
1 10
|
||||
2 20
|
||||
3 30
|
||||
4 40
|
||||
5 50
|
||||
|
||||
@ -0,0 +1,45 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import org.apache.doris.regression.suite.ClusterOptions
|
||||
import org.apache.doris.regression.util.NodeType
|
||||
|
||||
suite('test_publish_one_succ') {
|
||||
def options = new ClusterOptions()
|
||||
options.enableDebugPoints()
|
||||
docker(options) {
|
||||
cluster.injectDebugPoints(NodeType.FE, ['PublishVersionDaemon.stop_publish':null])
|
||||
|
||||
sql 'SET GLOBAL insert_visible_timeout_ms = 1000'
|
||||
sql "ADMIN SET FRONTEND CONFIG ('publish_wait_time_second' = '1000000')"
|
||||
sql 'CREATE TABLE tbl (k1 int, k2 int)'
|
||||
for (def i = 1; i <= 5; i++) {
|
||||
sql "INSERT INTO tbl VALUES (${i}, ${10 * i})"
|
||||
}
|
||||
|
||||
cluster.stopBackends(2, 3)
|
||||
cluster.checkBeIsAlive(2, false)
|
||||
cluster.checkBeIsAlive(3, false)
|
||||
cluster.clearFrontendDebugPoints()
|
||||
|
||||
sleep(1000)
|
||||
order_qt_select_1 'SELECT * FROM tbl'
|
||||
sql "ADMIN SET FRONTEND CONFIG ('publish_wait_time_second' = '2')"
|
||||
sleep(2000)
|
||||
order_qt_select_2 'SELECT * FROM tbl'
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user