diff --git a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java index 0cef447f6c..118cc857de 100644 --- a/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java +++ b/fe/src/main/java/org/apache/doris/common/proc/TransProcDir.java @@ -35,11 +35,13 @@ public class TransProcDir implements ProcDirInterface { .add("LoadJobSourceType") .add("PrepareTime") .add("CommitTime") + .add("PublishTime") .add("FinishTime") .add("Reason") .add("ErrorReplicasCount") .add("ListenerId") .add("TimeoutMs") + .add("ErrMsg") .build(); public static final int MAX_SHOW_ENTRIES = 2000; diff --git a/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index fef801f10b..ac2764efb0 100644 --- a/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -17,13 +17,6 @@ package org.apache.doris.transaction; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.apache.commons.collections.CollectionUtils; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.MaterializedIndex; @@ -44,9 +37,9 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.common.UserException; import org.apache.doris.common.util.Util; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; @@ -59,6 +52,15 @@ import org.apache.doris.task.ClearTransactionTask; import org.apache.doris.task.PublishVersionTask; import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUniqueId; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -231,11 +233,13 @@ public class DatabaseTransactionMgr { info.add(txnState.getSourceType().name()); info.add(TimeUtils.longToTimeString(txnState.getPrepareTime())); info.add(TimeUtils.longToTimeString(txnState.getCommitTime())); + info.add(TimeUtils.longToTimeString(txnState.getPublishVersionTime())); info.add(TimeUtils.longToTimeString(txnState.getFinishTime())); info.add(txnState.getReason()); info.add(String.valueOf(txnState.getErrorReplicas().size())); info.add(String.valueOf(txnState.getCallbackId())); info.add(String.valueOf(txnState.getTimeoutMs())); + info.add(txnState.getErrMsg()); } public long beginTransaction(List tableIdList, String label, TUniqueId requestId, @@ -579,8 +583,8 @@ public class DatabaseTransactionMgr { } public List getCommittedTxnList() { + readLock(); try { - readLock(); // only send task to committed transaction return idToRunningTransactionState.values().stream() .filter(transactionState -> (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED)) @@ -653,6 +657,9 @@ public class DatabaseTransactionMgr { transactionId, partitionCommitInfo.getVersion(), partition.getVisibleVersion()); + String errMsg = String.format("wait for publishing partition %d version %d. self version: %d. table %d", + partitionId, partition.getVisibleVersion() + 1, partitionCommitInfo.getVersion(), tableId); + transactionState.setErrorMsg(errMsg); return; } int quorumReplicaNum = partitionInfo.getReplicationNum(partitionId) / 2 + 1; @@ -721,8 +728,12 @@ public class DatabaseTransactionMgr { } if (healthReplicaNum < quorumReplicaNum) { - LOG.info("publish version failed for transaction {} on tablet {}, with only {} replicas less than quorum {}", + LOG.info("publish version failed for transaction {} on tablet {}, with only {} replicas less than quorum {}", transactionState, tablet, healthReplicaNum, quorumReplicaNum); + String errMsg = String.format("publish on tablet %d failed. succeed replica num %d less than quorum %d." + + " table: %d, partition: %d, publish version: %d", + tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId, partitionId, partition.getVisibleVersion() + 1); + transactionState.setErrorMsg(errMsg); hasError = true; } } @@ -737,6 +748,7 @@ public class DatabaseTransactionMgr { try { transactionState.setErrorReplicas(errorReplicaIds); transactionState.setFinishTime(System.currentTimeMillis()); + transactionState.clearErrorMsg(); transactionState.setTransactionStatus(TransactionStatus.VISIBLE); unprotectUpsertTransactionState(transactionState, false); txnOperated = true; diff --git a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 1b637b0ce7..70b0e521b1 100644 --- a/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -167,7 +167,7 @@ public class PublishVersionDaemon extends MasterDaemon { // transaction's publish is timeout, but there still has unfinished tasks. // we need to collect all error replicas, and try to finish this txn. for (PublishVersionTask unfinishedTask : unfinishedTasks) { - // set all replica in the backend to error state + // set all replicas in the backend to error state List versionInfos = unfinishedTask.getPartitionVersionInfos(); Set errorPartitionIds = Sets.newHashSet(); for (TPartitionVersionInfo versionInfo : versionInfos) { @@ -177,6 +177,8 @@ public class PublishVersionDaemon extends MasterDaemon { continue; } + // get all tablets of these error partitions, and mark their replicas as error. + // current we don't have partition to tablet map in FE, so here we use an inefficient way. // TODO(cmy): this is inefficient, but just keep it simple. will change it later. List tabletIds = tabletInvertedIndex.getTabletIdsByBackendId(unfinishedTask.getBackendId()); for (long tabletId : tabletIds) { diff --git a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java index 4ddbfaf5ca..dbccc14183 100644 --- a/fe/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -187,7 +187,7 @@ public class TransactionState implements Writable { // this state need not to be serialized private Map publishVersionTasks; private boolean hasSendTask; - private long publishVersionTime; + private long publishVersionTime = -1; private TransactionStatus preStatus = null; private long callbackId = -1; @@ -206,6 +206,11 @@ public class TransactionState implements Writable { private String errorLogUrl = null; + // record some error msgs during the transaction operation. + // this msg will be shown in show proc "/transactions/dbId/"; + // no need to persist. + private String errMsg = ""; + public TransactionState() { this.dbId = -1; this.tableIdList = Lists.newArrayList(); @@ -646,4 +651,16 @@ public class TransactionState implements Writable { } } } + + public void setErrorMsg(String errMsg) { + this.errMsg = errMsg; + } + + public void clearErrorMsg() { + this.errMsg = ""; + } + + public String getErrMsg() { + return this.errMsg; + } } diff --git a/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java index 38399a1cb6..78c06991b4 100644 --- a/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/DatabaseTransactionMgrTest.java @@ -214,10 +214,11 @@ public class DatabaseTransactionMgrTest { assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(5))); assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(6))); assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(7))); - assertEquals("", txnInfo.get(8)); - assertEquals("0", txnInfo.get(9)); - assertEquals("-1", txnInfo.get(10)); - assertEquals(String.valueOf(Config.stream_load_default_timeout_second * 1000), txnInfo.get(11)); + assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(8))); + assertEquals("", txnInfo.get(9)); + assertEquals("0", txnInfo.get(10)); + assertEquals("-1", txnInfo.get(11)); + assertEquals(String.valueOf(Config.stream_load_default_timeout_second * 1000), txnInfo.get(12)); } @Test