From 044d7830c9e162ec59207629dccc0a68c50e82cb Mon Sep 17 00:00:00 2001 From: yujun Date: Wed, 20 Dec 2023 09:31:21 +0800 Subject: [PATCH] [improvement](transaction) reduce publish txn log (#28277) --- .../java/org/apache/doris/common/Config.java | 4 + .../transaction/DatabaseTransactionMgr.java | 319 ++++++++++-------- .../doris/transaction/TransactionState.java | 19 ++ 3 files changed, 203 insertions(+), 139 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index f0d1930ee5..acb82ae7d1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -456,6 +456,10 @@ public class Config extends ConfigBase { + " dead lock" }) public static boolean publish_version_check_alter_replica = true; + @ConfField(mutable = true, masterOnly = true, description = {"单个事务 publish 失败打日志间隔", + "print log interval for publish transaction failed interval"}) + public static long publish_fail_log_interval_second = 5 * 60; + @ConfField(mutable = true, masterOnly = true, description = {"提交事务的最大超时时间,单位是秒。" + "该参数仅用于事务型 insert 操作中。", "Maximal waiting time for all data inserted before one transaction to be committed, in seconds. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 6ef7d3e70a..29471255f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -948,19 +948,8 @@ public class DatabaseTransactionMgr { // add all commit errors and publish errors to a single set Set errorReplicaIds = transactionState.getErrorReplicas(); - Map publishTasks = transactionState.getPublishVersionTasks(); - long now = System.currentTimeMillis(); - long firstPublishVersionTime = transactionState.getFirstPublishVersionTime(); - boolean allowPublishOneSucc = false; - if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0 - && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) { - allowPublishOneSucc = true; - } - - List tabletSuccReplicas = Lists.newArrayList(); - List tabletWriteFailedReplicas = Lists.newArrayList(); - List tabletVersionFailedReplicas = Lists.newArrayList(); + List> relatedTblPartitions = Lists.newArrayList(); // case 1 If database is dropped, then we just throw MetaNotFoundException, because all related tables are // already force dropped, we just ignore the transaction with all tables been force dropped. @@ -975,134 +964,12 @@ public class DatabaseTransactionMgr { LOG.debug("finish transaction {} with tables {}", transactionId, tableIdList); List tableList = db.getTablesOnIdOrderIfExist(tableIdList); tableList = MetaLockUtils.writeLockTablesIfExist(tableList); - PublishResult publishResult = PublishResult.QUORUM_SUCC; + PublishResult publishResult; try { - Iterator tableCommitInfoIterator - = transactionState.getIdToTableCommitInfos().values().iterator(); - while (tableCommitInfoIterator.hasNext()) { - TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next(); - long tableId = tableCommitInfo.getTableId(); - OlapTable table = (OlapTable) db.getTableNullable(tableId); - // table maybe dropped between commit and publish, ignore this error - if (table == null) { - tableCommitInfoIterator.remove(); - LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}", - tableId, - transactionState); - continue; - } - - boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionId, table); - Iterator partitionCommitInfoIterator - = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator(); - while (partitionCommitInfoIterator.hasNext()) { - PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next(); - long partitionId = partitionCommitInfo.getPartitionId(); - Partition partition = table.getPartition(partitionId); - // partition maybe dropped between commit and publish version, ignore this error - if (partition == null) { - partitionCommitInfoIterator.remove(); - LOG.warn("partition {} is dropped, skip version check" - + " and remove it from transaction state {}", partitionId, transactionState); - continue; - } - if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) { - LOG.debug("transactionId {} partition commitInfo version {} is not equal with " - + "partition visible version {} plus one, need wait", - transactionId, - partitionCommitInfo.getVersion(), - partition.getVisibleVersion()); - String errMsg = String.format("wait for publishing partition %d version %d." - + " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1, - partitionCommitInfo.getVersion(), tableId); - transactionState.setErrorMsg(errMsg); - return; - } - - int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId); - List allIndices; - if (transactionState.getLoadedTblIndexes().isEmpty()) { - allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); - } else { - allIndices = Lists.newArrayList(); - for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) { - MaterializedIndex index = partition.getIndex(indexId); - if (index != null) { - allIndices.add(index); - } - } - } - - // check success replica number for each tablet. - // a success replica means: - // 1. Not in errorReplicaIds: succeed in both commit and publish phase - // 2. last failed version < 0: is a health replica before - // 3. version catch up: not with a stale version - // Here we only check number, the replica version will be updated in updateCatalogAfterVisible() - for (MaterializedIndex index : allIndices) { - for (Tablet tablet : index.getTablets()) { - tabletSuccReplicas.clear(); - tabletWriteFailedReplicas.clear(); - tabletVersionFailedReplicas.clear(); - for (Replica replica : tablet.getReplicas()) { - checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn, - partitionCommitInfo.getVersion(), publishTasks.get(replica.getBackendId()), - errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas, - tabletVersionFailedReplicas); - } - - int healthReplicaNum = tabletSuccReplicas.size(); - if (healthReplicaNum >= loadRequiredReplicaNum) { - if (!tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty()) { - String writeDetail = getTabletWriteDetail(tabletSuccReplicas, - tabletWriteFailedReplicas, tabletVersionFailedReplicas); - LOG.info("publish version quorum succ for transaction {} on tablet {} with version" - + " {}, and has failed replicas, load require replica num {}. table {}, " - + "partition {}, tablet detail: {}", - transactionState, tablet.getId(), partitionCommitInfo.getVersion(), - loadRequiredReplicaNum, tableId, partitionId, writeDetail); - } - continue; - } - - String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, - tabletVersionFailedReplicas); - if (allowPublishOneSucc && healthReplicaNum > 0) { - if (publishResult == PublishResult.QUORUM_SUCC) { - publishResult = PublishResult.TIMEOUT_SUCC; - } - // We can not do any thing except retrying, - // because publish task is assigned a version, - // and doris does not permit discontinuous - // versions. - // - // If a timeout happens, it means that the rowset - // that are being publised exists on a few replicas we should go - // ahead, otherwise data may be lost and thre - // publish task hangs forever. - LOG.info("publish version timeout succ for transaction {} on tablet {} with version" - + " {}, and has failed replicas, load require replica num {}. table {}, " - + "partition {}, tablet detail: {}", - transactionState, tablet.getId(), partitionCommitInfo.getVersion(), - loadRequiredReplicaNum, tableId, partitionId, writeDetail); - } else { - publishResult = PublishResult.FAILED; - String errMsg = String.format("publish on tablet %d failed." - + " succeed replica num %d < load required replica num %d." - + " table: %d, partition: %d, publish version: %d", - tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId, - partitionId, partition.getVisibleVersion() + 1); - transactionState.setErrorMsg(errMsg); - LOG.info("publish version failed for transaction {} on tablet {} with version" - + " {}, and has failed replicas, load required replica num {}. table {}, " - + "partition {}, tablet detail: {}", - transactionState, tablet.getId(), partitionCommitInfo.getVersion(), - loadRequiredReplicaNum, tableId, partitionId, writeDetail); - } - } - } - } + if (!finishCheckPartitionVersion(transactionState, db, relatedTblPartitions)) { + return; } + publishResult = finishCheckQuorumReplicas(transactionState, relatedTblPartitions, errorReplicaIds); if (publishResult == PublishResult.FAILED) { return; } @@ -1137,7 +1004,181 @@ public class DatabaseTransactionMgr { // Otherwise, there is no way for stream load to query the result right after loading finished, // even if we call "sync" before querying. transactionState.countdownVisibleLatch(); - LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name()); + LOG.info("finish transaction {} successfully, publish times {}, publish result {}", + transactionState, transactionState.getPublishCount(), publishResult.name()); + } + + private boolean finishCheckPartitionVersion(TransactionState transactionState, Database db, + List> relatedTblPartitions) { + Iterator tableCommitInfoIterator + = transactionState.getIdToTableCommitInfos().values().iterator(); + while (tableCommitInfoIterator.hasNext()) { + TableCommitInfo tableCommitInfo = tableCommitInfoIterator.next(); + long tableId = tableCommitInfo.getTableId(); + OlapTable table = (OlapTable) db.getTableNullable(tableId); + // table maybe dropped between commit and publish, ignore this error + if (table == null) { + tableCommitInfoIterator.remove(); + LOG.warn("table {} is dropped, skip version check and remove it from transaction state {}", + tableId, + transactionState); + continue; + } + + Iterator partitionCommitInfoIterator + = tableCommitInfo.getIdToPartitionCommitInfo().values().iterator(); + while (partitionCommitInfoIterator.hasNext()) { + PartitionCommitInfo partitionCommitInfo = partitionCommitInfoIterator.next(); + long partitionId = partitionCommitInfo.getPartitionId(); + Partition partition = table.getPartition(partitionId); + // partition maybe dropped between commit and publish version, ignore this error + if (partition == null) { + partitionCommitInfoIterator.remove(); + LOG.warn("partition {} is dropped, skip version check" + + " and remove it from transaction state {}", partitionId, transactionState); + continue; + } + if (partition.getVisibleVersion() != partitionCommitInfo.getVersion() - 1) { + LOG.debug("for table {} partition {}, transactionId {} partition commitInfo version {} is not" + + " equal with partition visible version {} plus one, need wait", + table.getId(), partition.getId(), transactionState.getTransactionId(), + partitionCommitInfo.getVersion(), partition.getVisibleVersion()); + String errMsg = String.format("wait for publishing partition %d version %d." + + " self version: %d. table %d", partitionId, partition.getVisibleVersion() + 1, + partitionCommitInfo.getVersion(), tableId); + transactionState.setErrorMsg(errMsg); + return false; + } + + relatedTblPartitions.add(Pair.of(table, partition)); + } + } + + return true; + } + + private PublishResult finishCheckQuorumReplicas(TransactionState transactionState, + List> relatedTblPartitions, + Set errorReplicaIds) { + long now = System.currentTimeMillis(); + long firstPublishVersionTime = transactionState.getFirstPublishVersionTime(); + boolean allowPublishOneSucc = false; + if (Config.publish_wait_time_second > 0 && firstPublishVersionTime > 0 + && now >= firstPublishVersionTime + Config.publish_wait_time_second * 1000L) { + allowPublishOneSucc = true; + } + + List tabletSuccReplicas = Lists.newArrayList(); + List tabletWriteFailedReplicas = Lists.newArrayList(); + List tabletVersionFailedReplicas = Lists.newArrayList(); + List logs = Lists.newArrayList(); + + Map publishTasks = transactionState.getPublishVersionTasks(); + PublishResult publishResult = PublishResult.QUORUM_SUCC; + for (Pair pair : relatedTblPartitions) { + OlapTable table = pair.key(); + Partition partition = pair.value(); + long tableId = table.getId(); + long partitionId = partition.getId(); + long newVersion = partition.getVisibleVersion() + 1; + int loadRequiredReplicaNum = table.getLoadRequiredReplicaNum(partitionId); + List allIndices; + if (transactionState.getLoadedTblIndexes().isEmpty()) { + allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL); + } else { + allIndices = Lists.newArrayList(); + for (long indexId : transactionState.getLoadedTblIndexes().get(tableId)) { + MaterializedIndex index = partition.getIndex(indexId); + if (index != null) { + allIndices.add(index); + } + } + } + + boolean alterReplicaLoadedTxn = isAlterReplicaLoadedTxn(transactionState.getTransactionId(), table); + + // check success replica number for each tablet. + // a success replica means: + // 1. Not in errorReplicaIds: succeed in both commit and publish phase + // 2. last failed version < 0: is a health replica before + // 3. version catch up: not with a stale version + // Here we only check number, the replica version will be updated in updateCatalogAfterVisible() + for (MaterializedIndex index : allIndices) { + for (Tablet tablet : index.getTablets()) { + tabletSuccReplicas.clear(); + tabletWriteFailedReplicas.clear(); + tabletVersionFailedReplicas.clear(); + for (Replica replica : tablet.getReplicas()) { + checkReplicaContinuousVersionSucc(tablet.getId(), replica, alterReplicaLoadedTxn, + newVersion, publishTasks.get(replica.getBackendId()), + errorReplicaIds, tabletSuccReplicas, tabletWriteFailedReplicas, + tabletVersionFailedReplicas); + } + + int healthReplicaNum = tabletSuccReplicas.size(); + if (healthReplicaNum >= loadRequiredReplicaNum) { + boolean hasFailedReplica = !tabletWriteFailedReplicas.isEmpty() + || !tabletVersionFailedReplicas.isEmpty(); + if (hasFailedReplica) { + String writeDetail = getTabletWriteDetail(tabletSuccReplicas, + tabletWriteFailedReplicas, tabletVersionFailedReplicas); + logs.add(String.format("publish version quorum succ for transaction %s on tablet %s" + + " with version %s, and has failed replicas, load require replica num %s. " + + "table %s, partition %s, tablet detail: %s", + transactionState, tablet.getId(), newVersion, + loadRequiredReplicaNum, tableId, partitionId, writeDetail)); + } + continue; + } + + String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas, + tabletVersionFailedReplicas); + if (allowPublishOneSucc && healthReplicaNum > 0) { + if (publishResult == PublishResult.QUORUM_SUCC) { + publishResult = PublishResult.TIMEOUT_SUCC; + } + // We can not do any thing except retrying, + // because publish task is assigned a version, + // and doris does not permit discontinuous + // versions. + // + // If a timeout happens, it means that the rowset + // that are being publised exists on a few replicas we should go + // ahead, otherwise data may be lost and thre + // publish task hangs forever. + logs.add(String.format("publish version timeout succ for transaction %s on tablet %s " + + "with version %s, and has failed replicas, load require replica num %s. " + + "table %s, partition %s, tablet detail: %s", + transactionState, tablet.getId(), newVersion, + loadRequiredReplicaNum, tableId, partitionId, writeDetail)); + } else { + publishResult = PublishResult.FAILED; + String errMsg = String.format("publish on tablet %d failed." + + " succeed replica num %d < load required replica num %d." + + " table: %d, partition: %d, publish version: %d", + tablet.getId(), healthReplicaNum, loadRequiredReplicaNum, tableId, + partitionId, newVersion); + transactionState.setErrorMsg(errMsg); + logs.add(String.format("publish version failed for transaction %s on tablet %s with version" + + " %s, and has failed replicas, load required replica num %s. table %s, " + + "partition %s, tablet detail: %s", + transactionState, tablet.getId(), newVersion, + loadRequiredReplicaNum, tableId, partitionId, writeDetail)); + } + } + } + } + + boolean needLog = publishResult != PublishResult.FAILED + || now - transactionState.getLastPublishLogTime() > Config.publish_fail_log_interval_second * 1000L; + if (needLog) { + transactionState.setLastPublishLogTime(now); + for (String log : logs) { + LOG.info("{}. publish times {}", log, transactionState.getPublishCount()); + } + } + + return publishResult; } private boolean isAlterReplicaLoadedTxn(long transactionId, OlapTable table) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 9e2054f553..c4fc6de761 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -229,6 +229,12 @@ public class TransactionState implements Writable { private long lastPublishVersionTime = -1; + private long publishCount = 0; + + // txn may try finish many times and generate a lot of log. + // use lastPublishLogTime to reduce log. + private long lastPublishLogTime = 0; + @SerializedName(value = "callbackId") private long callbackId = -1; @@ -346,6 +352,7 @@ public class TransactionState implements Writable { } public void updateSendTaskTime() { + this.publishCount++; this.lastPublishVersionTime = System.currentTimeMillis(); if (this.firstPublishVersionTime <= 0) { this.firstPublishVersionTime = lastPublishVersionTime; @@ -360,6 +367,10 @@ public class TransactionState implements Writable { return this.lastPublishVersionTime; } + public long getPublishCount() { + return publishCount; + } + public boolean hasSendTask() { return this.hasSendTask; } @@ -428,6 +439,14 @@ public class TransactionState implements Writable { return errorLogUrl; } + public long getLastPublishLogTime() { + return lastPublishLogTime; + } + + public void setLastPublishLogTime(long lastPublishLogTime) { + this.lastPublishLogTime = lastPublishLogTime; + } + public void setTransactionStatus(TransactionStatus transactionStatus) { // status changed this.preStatus = this.transactionStatus;