[improvement](transaction) reduce publish txn log (#28277)
This commit is contained in:
@ -456,6 +456,10 @@ public class Config extends ConfigBase {
|
||||
+ " dead lock" })
|
||||
public static boolean publish_version_check_alter_replica = true;
|
||||
|
||||
@ConfField(mutable = true, masterOnly = true, description = {"单个事务 publish 失败打日志间隔",
|
||||
"print log interval for publish transaction failed interval"})
|
||||
public static long publish_fail_log_interval_second = 5 * 60;
|
||||
|
||||
@ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。"
|
||||
+ "该参数仅用于事务型 insert 操作中。",
|
||||
"Maximal waiting time for all data inserted before one transaction to be committed, in seconds. "
|
||||
|
||||
@ -948,19 +948,8 @@ public class DatabaseTransactionMgr {
|
||||
|
||||
// add all commit errors and publish errors to a single set
|
||||
Set<Long> errorReplicaIds = transactionState.getErrorReplicas();
|
||||
Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks();
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
long firstPublishVersionTime = transactionState.getFirstPublishVersionTime();
|
||||
boolean allowPublishOneSucc = false;
|
||||
if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
|
||||
&& now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
|
||||
allowPublishOneSucc = true;
|
||||
}
|
||||
|
||||
List<Replica> tabletSuccReplicas = Lists.newArrayList();
|
||||
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
|
||||
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
|
||||
List<Pair<OlapTable, Partition>> relatedTblPartitions = Lists.newArrayList();
|
||||
|
||||
// case 1 If database is dropped, then we just throw MetaNotFoundException, because all related tables are
|
||||
// already force dropped, we just ignore the transaction with all tables been force dropped.
|
||||
@ -975,134 +964,12 @@ public class DatabaseTransactionMgr {
|
||||
LOG.debug("finish transaction {} with tables {}", transactionId, tableIdList);
|
||||
List<? extends TableIf> tableList = db.getTablesOnIdOrderIfExist(tableIdList);
|
||||
tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
|
||||
PublishResult publishResult = PublishResult.QUORUM_SUCC;
|
||||
PublishResult publishResult;
|
||||
try {
|
||||
Iterator<TableCommitInfo> tableCommitInfoIterator
|
||||
= transactionState.getIdToTableCommitInfos().values().iterator();
|
||||
while (tableCommitInfoIterator.hasNext()) {
|
||||
TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
|
||||
long tableId = tableCommitInfo.getTableId();
|
||||
OlapTable table = (OlapTable) db.getTableNullable(tableId);
|
||||
// table maybe dropped between commit and publish, ignore this error
|
||||
if (table == null) {
|
||||
tableCommitInfoIterator.remove();
|
||||
LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}",
|
||||
tableId,
|
||||
transactionState);
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionId, table);
|
||||
Iterator<PartitionCommitInfo> partitionCommitInfoIterator
|
||||
= tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
|
||||
while (partitionCommitInfoIterator.hasNext()) {
|
||||
PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next();
|
||||
long partitionId = partitionCommitInfo.getPartitionId();
|
||||
Partition partition = table.getPartition(partitionId);
|
||||
// partition maybe dropped between commit and publish version, ignore this error
|
||||
if (partition == null) {
|
||||
partitionCommitInfoIterator.remove();
|
||||
LOG.warn("partition {} is dropped, skip version check"
|
||||
+ " and remove it from transaction state {}", partitionId, transactionState);
|
||||
continue;
|
||||
}
|
||||
if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) {
|
||||
LOG.debug("transactionId {} partition commitInfo version {} is not equal with "
|
||||
+ "partition visible version {} plus one, need wait",
|
||||
transactionId,
|
||||
partitionCommitInfo.getVersion(),
|
||||
partition.getVisibleVersion());
|
||||
String errMsg = String.format("wait for publishing partition %d version %d."
|
||||
+ " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1,
|
||||
partitionCommitInfo.getVersion(), tableId);
|
||||
transactionState.setErrorMsg(errMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId);
|
||||
List<MaterializedIndex> allIndices;
|
||||
if (transactionState.getLoadedTblIndexes().isEmpty()) {
|
||||
allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
|
||||
} else {
|
||||
allIndices = Lists.newArrayList();
|
||||
for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
|
||||
MaterializedIndex index = partition.getIndex(indexId);
|
||||
if (index != null) {
|
||||
allIndices.add(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check success replica number for each tablet.
|
||||
// a success replica means:
|
||||
// 1. Not in errorReplicaIds: succeed in both commit and publish phase
|
||||
// 2. last failed version < 0: is a health replica before
|
||||
// 3. version catch up: not with a stale version
|
||||
// Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
|
||||
for (MaterializedIndex index : allIndices) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
tabletSuccReplicas.clear();
|
||||
tabletWriteFailedReplicas.clear();
|
||||
tabletVersionFailedReplicas.clear();
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn,
|
||||
partitionCommitInfo.getVersion(), publishTasks.get(replica.getBackendId()),
|
||||
errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
|
||||
tabletVersionFailedReplicas);
|
||||
}
|
||||
|
||||
int healthReplicaNum = tabletSuccReplicas.size();
|
||||
if (healthReplicaNum >= loadRequiredReplicaNum) {
|
||||
if (!tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty()) {
|
||||
String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
|
||||
tabletWriteFailedReplicas, tabletVersionFailedReplicas);
|
||||
LOG.info("publish version quorum succ for transaction {} on tablet {} with version"
|
||||
+ " {}, and has failed replicas, load require replica num {}. table {}, "
|
||||
+ "partition {}, tablet detail: {}",
|
||||
transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
|
||||
loadRequiredReplicaNum, tableId, partitionId, writeDetail);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
|
||||
tabletVersionFailedReplicas);
|
||||
if (allowPublishOneSucc && healthReplicaNum > 0) {
|
||||
if (publishResult == PublishResult.QUORUM_SUCC) {
|
||||
publishResult = PublishResult.TIMEOUT_SUCC;
|
||||
}
|
||||
// We can not do any thing except retrying,
|
||||
// because publish task is assigned a version,
|
||||
// and doris does not permit discontinuous
|
||||
// versions.
|
||||
//
|
||||
// If a timeout happens, it means that the rowset
|
||||
// that are being publised exists on a few replicas we should go
|
||||
// ahead, otherwise data may be lost and thre
|
||||
// publish task hangs forever.
|
||||
LOG.info("publish version timeout succ for transaction {} on tablet {} with version"
|
||||
+ " {}, and has failed replicas, load require replica num {}. table {}, "
|
||||
+ "partition {}, tablet detail: {}",
|
||||
transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
|
||||
loadRequiredReplicaNum, tableId, partitionId, writeDetail);
|
||||
} else {
|
||||
publishResult = PublishResult.FAILED;
|
||||
String errMsg = String.format("publish on tablet %d failed."
|
||||
+ " succeed replica num %d < load required replica num %d."
|
||||
+ " table: %d, partition: %d, publish version: %d",
|
||||
tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId,
|
||||
partitionId, partition.getVisibleVersion() + 1);
|
||||
transactionState.setErrorMsg(errMsg);
|
||||
LOG.info("publish version failed for transaction {} on tablet {} with version"
|
||||
+ " {}, and has failed replicas, load required replica num {}. table {}, "
|
||||
+ "partition {}, tablet detail: {}",
|
||||
transactionState, tablet.getId(), partitionCommitInfo.getVersion(),
|
||||
loadRequiredReplicaNum, tableId, partitionId, writeDetail);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!finishCheckPartitionVersion(transactionState, db, relatedTblPartitions)) {
|
||||
return;
|
||||
}
|
||||
publishResult = finishCheckQuorumReplicas(transactionState, relatedTblPartitions, errorReplicaIds);
|
||||
if (publishResult == PublishResult.FAILED) {
|
||||
return;
|
||||
}
|
||||
@ -1137,7 +1004,181 @@ public class DatabaseTransactionMgr {
|
||||
// Otherwise, there is no way for stream load to query the result right after loading finished,
|
||||
// even if we call "sync" before querying.
|
||||
transactionState.countdownVisibleLatch();
|
||||
LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name());
|
||||
LOG.info("finish transaction {} successfully, publish times {}, publish result {}",
|
||||
transactionState, transactionState.getPublishCount(), publishResult.name());
|
||||
}
|
||||
|
||||
private boolean finishCheckPartitionVersion(TransactionState transactionState, Database db,
|
||||
List<Pair<OlapTable, Partition>> relatedTblPartitions) {
|
||||
Iterator<TableCommitInfo> tableCommitInfoIterator
|
||||
= transactionState.getIdToTableCommitInfos().values().iterator();
|
||||
while (tableCommitInfoIterator.hasNext()) {
|
||||
TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next();
|
||||
long tableId = tableCommitInfo.getTableId();
|
||||
OlapTable table = (OlapTable) db.getTableNullable(tableId);
|
||||
// table maybe dropped between commit and publish, ignore this error
|
||||
if (table == null) {
|
||||
tableCommitInfoIterator.remove();
|
||||
LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}",
|
||||
tableId,
|
||||
transactionState);
|
||||
continue;
|
||||
}
|
||||
|
||||
Iterator<PartitionCommitInfo> partitionCommitInfoIterator
|
||||
= tableCommitInfo.getIdToPartitionCommitInfo().values().iterator();
|
||||
while (partitionCommitInfoIterator.hasNext()) {
|
||||
PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next();
|
||||
long partitionId = partitionCommitInfo.getPartitionId();
|
||||
Partition partition = table.getPartition(partitionId);
|
||||
// partition maybe dropped between commit and publish version, ignore this error
|
||||
if (partition == null) {
|
||||
partitionCommitInfoIterator.remove();
|
||||
LOG.warn("partition {} is dropped, skip version check"
|
||||
+ " and remove it from transaction state {}", partitionId, transactionState);
|
||||
continue;
|
||||
}
|
||||
if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) {
|
||||
LOG.debug("for table {} partition {}, transactionId {} partition commitInfo version {} is not"
|
||||
+ " equal with partition visible version {} plus one, need wait",
|
||||
table.getId(), partition.getId(), transactionState.getTransactionId(),
|
||||
partitionCommitInfo.getVersion(), partition.getVisibleVersion());
|
||||
String errMsg = String.format("wait for publishing partition %d version %d."
|
||||
+ " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1,
|
||||
partitionCommitInfo.getVersion(), tableId);
|
||||
transactionState.setErrorMsg(errMsg);
|
||||
return false;
|
||||
}
|
||||
|
||||
relatedTblPartitions.add(Pair.of(table, partition));
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private PublishResult finishCheckQuorumReplicas(TransactionState transactionState,
|
||||
List<Pair<OlapTable, Partition>> relatedTblPartitions,
|
||||
Set<Long> errorReplicaIds) {
|
||||
long now = System.currentTimeMillis();
|
||||
long firstPublishVersionTime = transactionState.getFirstPublishVersionTime();
|
||||
boolean allowPublishOneSucc = false;
|
||||
if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0
|
||||
&& now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) {
|
||||
allowPublishOneSucc = true;
|
||||
}
|
||||
|
||||
List<Replica> tabletSuccReplicas = Lists.newArrayList();
|
||||
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
|
||||
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
|
||||
List<String> logs = Lists.newArrayList();
|
||||
|
||||
Map<Long, PublishVersionTask> publishTasks = transactionState.getPublishVersionTasks();
|
||||
PublishResult publishResult = PublishResult.QUORUM_SUCC;
|
||||
for (Pair<OlapTable, Partition> pair : relatedTblPartitions) {
|
||||
OlapTable table = pair.key();
|
||||
Partition partition = pair.value();
|
||||
long tableId = table.getId();
|
||||
long partitionId = partition.getId();
|
||||
long newVersion = partition.getVisibleVersion() + 1;
|
||||
int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId);
|
||||
List<MaterializedIndex> allIndices;
|
||||
if (transactionState.getLoadedTblIndexes().isEmpty()) {
|
||||
allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
|
||||
} else {
|
||||
allIndices = Lists.newArrayList();
|
||||
for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) {
|
||||
MaterializedIndex index = partition.getIndex(indexId);
|
||||
if (index != null) {
|
||||
allIndices.add(index);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionState.getTransactionId(), table);
|
||||
|
||||
// check success replica number for each tablet.
|
||||
// a success replica means:
|
||||
// 1. Not in errorReplicaIds: succeed in both commit and publish phase
|
||||
// 2. last failed version < 0: is a health replica before
|
||||
// 3. version catch up: not with a stale version
|
||||
// Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
|
||||
for (MaterializedIndex index : allIndices) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
tabletSuccReplicas.clear();
|
||||
tabletWriteFailedReplicas.clear();
|
||||
tabletVersionFailedReplicas.clear();
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn,
|
||||
newVersion, publishTasks.get(replica.getBackendId()),
|
||||
errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas,
|
||||
tabletVersionFailedReplicas);
|
||||
}
|
||||
|
||||
int healthReplicaNum = tabletSuccReplicas.size();
|
||||
if (healthReplicaNum >= loadRequiredReplicaNum) {
|
||||
boolean hasFailedReplica = !tabletWriteFailedReplicas.isEmpty()
|
||||
|| !tabletVersionFailedReplicas.isEmpty();
|
||||
if (hasFailedReplica) {
|
||||
String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
|
||||
tabletWriteFailedReplicas, tabletVersionFailedReplicas);
|
||||
logs.add(String.format("publish version quorum succ for transaction %s on tablet %s"
|
||||
+ " with version %s, and has failed replicas, load require replica num %s. "
|
||||
+ "table %s, partition %s, tablet detail: %s",
|
||||
transactionState, tablet.getId(), newVersion,
|
||||
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
|
||||
tabletVersionFailedReplicas);
|
||||
if (allowPublishOneSucc && healthReplicaNum > 0) {
|
||||
if (publishResult == PublishResult.QUORUM_SUCC) {
|
||||
publishResult = PublishResult.TIMEOUT_SUCC;
|
||||
}
|
||||
// We can not do any thing except retrying,
|
||||
// because publish task is assigned a version,
|
||||
// and doris does not permit discontinuous
|
||||
// versions.
|
||||
//
|
||||
// If a timeout happens, it means that the rowset
|
||||
// that are being publised exists on a few replicas we should go
|
||||
// ahead, otherwise data may be lost and thre
|
||||
// publish task hangs forever.
|
||||
logs.add(String.format("publish version timeout succ for transaction %s on tablet %s "
|
||||
+ "with version %s, and has failed replicas, load require replica num %s. "
|
||||
+ "table %s, partition %s, tablet detail: %s",
|
||||
transactionState, tablet.getId(), newVersion,
|
||||
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
|
||||
} else {
|
||||
publishResult = PublishResult.FAILED;
|
||||
String errMsg = String.format("publish on tablet %d failed."
|
||||
+ " succeed replica num %d < load required replica num %d."
|
||||
+ " table: %d, partition: %d, publish version: %d",
|
||||
tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId,
|
||||
partitionId, newVersion);
|
||||
transactionState.setErrorMsg(errMsg);
|
||||
logs.add(String.format("publish version failed for transaction %s on tablet %s with version"
|
||||
+ " %s, and has failed replicas, load required replica num %s. table %s, "
|
||||
+ "partition %s, tablet detail: %s",
|
||||
transactionState, tablet.getId(), newVersion,
|
||||
loadRequiredReplicaNum, tableId, partitionId, writeDetail));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean needLog = publishResult != PublishResult.FAILED
|
||||
|| now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L;
|
||||
if (needLog) {
|
||||
transactionState.setLastPublishLogTime(now);
|
||||
for (String log : logs) {
|
||||
LOG.info("{}. publish times {}", log, transactionState.getPublishCount());
|
||||
}
|
||||
}
|
||||
|
||||
return publishResult;
|
||||
}
|
||||
|
||||
private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable table) {
|
||||
|
||||
@ -229,6 +229,12 @@ public class TransactionState implements Writable {
|
||||
|
||||
private long lastPublishVersionTime = -1;
|
||||
|
||||
private long publishCount = 0;
|
||||
|
||||
// txn may try finish many times and generate a lot of log.
|
||||
// use lastPublishLogTime to reduce log.
|
||||
private long lastPublishLogTime = 0;
|
||||
|
||||
@SerializedName(value = "callbackId")
|
||||
private long callbackId = -1;
|
||||
|
||||
@ -346,6 +352,7 @@ public class TransactionState implements Writable {
|
||||
}
|
||||
|
||||
public void updateSendTaskTime() {
|
||||
this.publishCount++;
|
||||
this.lastPublishVersionTime = System.currentTimeMillis();
|
||||
if (this.firstPublishVersionTime <= 0) {
|
||||
this.firstPublishVersionTime = lastPublishVersionTime;
|
||||
@ -360,6 +367,10 @@ public class TransactionState implements Writable {
|
||||
return this.lastPublishVersionTime;
|
||||
}
|
||||
|
||||
public long getPublishCount() {
|
||||
return publishCount;
|
||||
}
|
||||
|
||||
public boolean hasSendTask() {
|
||||
return this.hasSendTask;
|
||||
}
|
||||
@ -428,6 +439,14 @@ public class TransactionState implements Writable {
|
||||
return errorLogUrl;
|
||||
}
|
||||
|
||||
public long getLastPublishLogTime() {
|
||||
return lastPublishLogTime;
|
||||
}
|
||||
|
||||
public void setLastPublishLogTime(long lastPublishLogTime) {
|
||||
this.lastPublishLogTime = lastPublishLogTime;
|
||||
}
|
||||
|
||||
public void setTransactionStatus(TransactionStatus transactionStatus) {
|
||||
// status changed
|
||||
this.preStatus = this.transactionStatus;
|
||||
|
||||
Reference in New Issue
Block a user