[Enhancement] Add detail msg to show the reason of publish failure. (#3647)

Add 2 new columns `PublishTime` and `ErrMsg` to show publish version time and  errors happen during the transaction process. Can be seen by executing: 

`SHOW PROC "/transactions/dbId/";`
or
`SHOW TRANSACTION WHERE ID=xx;`

Currently is only record error happen in publish phase, which can help us to find out which txn
is blocked.

Fix #3646
This commit is contained in:
Mingyu Chen
2020-05-22 22:59:53 +08:00
committed by GitHub
parent ba7d2dbf7b
commit 1124808fbc
5 changed files with 50 additions and 16 deletions

View File

@ -35,11 +35,13 @@ public class TransProcDir implements ProcDirInterface {
.add("LoadJobSourceType")
.add("PrepareTime")
.add("CommitTime")
.add("PublishTime")
.add("FinishTime")
.add("Reason")
.add("ErrorReplicasCount")
.add("ListenerId")
.add("TimeoutMs")
.add("ErrMsg")
.build();
public static final int MAX_SHOW_ENTRIES = 2000;

View File

@ -17,13 +17,6 @@
package org.apache.doris.transaction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
@ -44,9 +37,9 @@ import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.Util;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
@ -59,6 +52,15 @@ import org.apache.doris.task.ClearTransactionTask;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -231,11 +233,13 @@ public class DatabaseTransactionMgr {
info.add(txnState.getSourceType().name());
info.add(TimeUtils.longToTimeString(txnState.getPrepareTime()));
info.add(TimeUtils.longToTimeString(txnState.getCommitTime()));
info.add(TimeUtils.longToTimeString(txnState.getPublishVersionTime()));
info.add(TimeUtils.longToTimeString(txnState.getFinishTime()));
info.add(txnState.getReason());
info.add(String.valueOf(txnState.getErrorReplicas().size()));
info.add(String.valueOf(txnState.getCallbackId()));
info.add(String.valueOf(txnState.getTimeoutMs()));
info.add(txnState.getErrMsg());
}
public long beginTransaction(List<Long> tableIdList, String label, TUniqueId requestId,
@ -579,8 +583,8 @@ public class DatabaseTransactionMgr {
}
public List<TransactionState> getCommittedTxnList() {
readLock();
try {
readLock();
// only send task to committed transaction
return idToRunningTransactionState.values().stream()
.filter(transactionState -> (transactionState.getTransactionStatus() == TransactionStatus.COMMITTED))
@ -653,6 +657,9 @@ public class DatabaseTransactionMgr {
transactionId,
partitionCommitInfo.getVersion(),
partition.getVisibleVersion());
String errMsg = String.format("wait for publishing partition %d version %d. self version: %d. table %d",
partitionId, partition.getVisibleVersion() + 1, partitionCommitInfo.getVersion(), tableId);
transactionState.setErrorMsg(errMsg);
return;
}
int quorumReplicaNum = partitionInfo.getReplicationNum(partitionId) / 2 + 1;
@ -721,8 +728,12 @@ public class DatabaseTransactionMgr {
}
if (healthReplicaNum < quorumReplicaNum) {
LOG.info("publish version failed for transaction {} on tablet {}, with only {} replicas less than quorum {}",
LOG.info("publish version failed for transaction {} on tablet {}, with only {} replicas less than quorum {}",
transactionState, tablet, healthReplicaNum, quorumReplicaNum);
String errMsg = String.format("publish on tablet %d failed. succeed replica num %d less than quorum %d."
+ " table: %d, partition: %d, publish version: %d",
tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId, partitionId, partition.getVisibleVersion() + 1);
transactionState.setErrorMsg(errMsg);
hasError = true;
}
}
@ -737,6 +748,7 @@ public class DatabaseTransactionMgr {
try {
transactionState.setErrorReplicas(errorReplicaIds);
transactionState.setFinishTime(System.currentTimeMillis());
transactionState.clearErrorMsg();
transactionState.setTransactionStatus(TransactionStatus.VISIBLE);
unprotectUpsertTransactionState(transactionState, false);
txnOperated = true;

View File

@ -167,7 +167,7 @@ public class PublishVersionDaemon extends MasterDaemon {
// transaction's publish is timeout, but there still has unfinished tasks.
// we need to collect all error replicas, and try to finish this txn.
for (PublishVersionTask unfinishedTask : unfinishedTasks) {
// set all replica in the backend to error state
// set all replicas in the backend to error state
List<TPartitionVersionInfo> versionInfos = unfinishedTask.getPartitionVersionInfos();
Set<Long> errorPartitionIds = Sets.newHashSet();
for (TPartitionVersionInfo versionInfo : versionInfos) {
@ -177,6 +177,8 @@ public class PublishVersionDaemon extends MasterDaemon {
continue;
}
// get all tablets of these error partitions, and mark their replicas as error.
// current we don't have partition to tablet map in FE, so here we use an inefficient way.
// TODO(cmy): this is inefficient, but just keep it simple. will change it later.
List<Long> tabletIds = tabletInvertedIndex.getTabletIdsByBackendId(unfinishedTask.getBackendId());
for (long tabletId : tabletIds) {

View File

@ -187,7 +187,7 @@ public class TransactionState implements Writable {
// this state need not to be serialized
private Map<Long, PublishVersionTask> publishVersionTasks;
private boolean hasSendTask;
private long publishVersionTime;
private long publishVersionTime = -1;
private TransactionStatus preStatus = null;
private long callbackId = -1;
@ -206,6 +206,11 @@ public class TransactionState implements Writable {
private String errorLogUrl = null;
// record some error msgs during the transaction operation.
// this msg will be shown in show proc "/transactions/dbId/";
// no need to persist.
private String errMsg = "";
public TransactionState() {
this.dbId = -1;
this.tableIdList = Lists.newArrayList();
@ -646,4 +651,16 @@ public class TransactionState implements Writable {
}
}
}
public void setErrorMsg(String errMsg) {
this.errMsg = errMsg;
}
public void clearErrorMsg() {
this.errMsg = "";
}
public String getErrMsg() {
return this.errMsg;
}
}

View File

@ -214,10 +214,11 @@ public class DatabaseTransactionMgrTest {
assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(5)));
assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(6)));
assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(7)));
assertEquals("", txnInfo.get(8));
assertEquals("0", txnInfo.get(9));
assertEquals("-1", txnInfo.get(10));
assertEquals(String.valueOf(Config.stream_load_default_timeout_second * 1000), txnInfo.get(11));
assertTrue(currentTime > TimeUtils.timeStringToLong(txnInfo.get(8)));
assertEquals("", txnInfo.get(9));
assertEquals("0", txnInfo.get(10));
assertEquals("-1", txnInfo.get(11));
assertEquals(String.valueOf(Config.stream_load_default_timeout_second * 1000), txnInfo.get(12));
}
@Test