[fix](publish) publish go ahead even if quorum is not met (#23806)
Co-authored-by: Yongqiang YANG <dataroaring@gmail.com>
This commit is contained in:
@ -487,6 +487,32 @@ public class Replica implements Writable {
|
||||
return strBuffer.toString();
|
||||
}
|
||||
|
||||
public String toStringSimple(boolean checkBeAlive) {
|
||||
StringBuilder strBuffer = new StringBuilder("[replicaId=");
|
||||
strBuffer.append(id);
|
||||
strBuffer.append(", backendId=");
|
||||
strBuffer.append(backendId);
|
||||
if (checkBeAlive) {
|
||||
strBuffer.append(", backendAlive=");
|
||||
strBuffer.append(Env.getCurrentSystemInfo().checkBackendAlive(backendId));
|
||||
}
|
||||
strBuffer.append(", version=");
|
||||
strBuffer.append(version);
|
||||
if (lastFailedVersion > 0) {
|
||||
strBuffer.append(", lastFailedVersion=");
|
||||
strBuffer.append(lastFailedVersion);
|
||||
strBuffer.append(", lastSuccessVersion=");
|
||||
strBuffer.append(lastSuccessVersion);
|
||||
strBuffer.append(", lastFailedTimestamp=");
|
||||
strBuffer.append(lastFailedTimestamp);
|
||||
}
|
||||
strBuffer.append(", state=");
|
||||
strBuffer.append(state.name());
|
||||
strBuffer.append("]");
|
||||
|
||||
return strBuffer.toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeLong(id);
|
||||
|
||||
@ -81,7 +81,6 @@ import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -95,6 +94,12 @@ import java.util.stream.Collectors;
|
||||
|
||||
public class DatabaseTransactionMgr {
|
||||
|
||||
private enum PublishResult {
|
||||
FAILED,
|
||||
TIMEOUT_SUCC, // each tablet has least one replica succ, and timeout
|
||||
QUORUM_SUCC, // each tablet has least quorum replicas succ
|
||||
}
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(DatabaseTransactionMgr.class);
|
||||
// the max number of txn that can be remove per round.
|
||||
// set it to avoid holding lock too long when removing too many txns per round.
|
||||
@ -485,32 +490,9 @@ public class DatabaseTransactionMgr {
|
||||
}
|
||||
tabletToBackends.get(tabletId).add(tabletCommitInfos.get(i).getBackendId());
|
||||
}
|
||||
List<String> tabletSuccReplicas = Lists.newArrayList();
|
||||
List<String> tabletWriteFailedReplicas = Lists.newArrayList();
|
||||
List<String> tabletVersionFailedReplicas = Lists.newArrayList();
|
||||
Function<Replica, String> getReplicaInfo = replica -> {
|
||||
StringBuilder strBuffer = new StringBuilder("[replicaId=");
|
||||
strBuffer.append(replica.getId());
|
||||
strBuffer.append(", backendId=");
|
||||
strBuffer.append(replica.getBackendId());
|
||||
strBuffer.append(", backendAlive=");
|
||||
strBuffer.append(Env.getCurrentSystemInfo().checkBackendAlive(replica.getBackendId()));
|
||||
strBuffer.append(", version=");
|
||||
strBuffer.append(replica.getVersion());
|
||||
if (replica.getLastFailedVersion() >= 0) {
|
||||
strBuffer.append(", lastFailedVersion=");
|
||||
strBuffer.append(replica.getLastFailedVersion());
|
||||
strBuffer.append(", lastSuccessVersion=");
|
||||
strBuffer.append(replica.getLastSuccessVersion());
|
||||
strBuffer.append(", lastFailedTimestamp=");
|
||||
strBuffer.append(replica.getLastFailedTimestamp());
|
||||
}
|
||||
strBuffer.append(", state=");
|
||||
strBuffer.append(replica.getState().name());
|
||||
strBuffer.append("]");
|
||||
|
||||
return strBuffer.toString();
|
||||
};
|
||||
List<Replica> tabletSuccReplicas = Lists.newArrayList();
|
||||
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
|
||||
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
|
||||
|
||||
for (long tableId : tableToPartition.keySet()) {
|
||||
OlapTable table = (OlapTable) db.getTableOrMetaException(tableId);
|
||||
@ -558,15 +540,12 @@ public class DatabaseTransactionMgr {
|
||||
tabletSuccReplicas.clear();
|
||||
tabletWriteFailedReplicas.clear();
|
||||
tabletVersionFailedReplicas.clear();
|
||||
int successReplicaNum = 0;
|
||||
long tabletId = tablet.getId();
|
||||
Set<Long> tabletBackends = tablet.getBackendIds();
|
||||
totalInvolvedBackends.addAll(tabletBackends);
|
||||
Set<Long> commitBackends = tabletToBackends.get(tabletId);
|
||||
// save the error replica ids for current tablet
|
||||
// this param is used for log
|
||||
Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
|
||||
String errorReplicaInfo = new String();
|
||||
for (long tabletBackend : tabletBackends) {
|
||||
Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend);
|
||||
if (replica == null) {
|
||||
@ -582,55 +561,28 @@ public class DatabaseTransactionMgr {
|
||||
// ignore it but not log it
|
||||
// for example, a replica is in clone state
|
||||
if (replica.getLastFailedVersion() < 0) {
|
||||
++successReplicaNum;
|
||||
tabletSuccReplicas.add(getReplicaInfo.apply(replica));
|
||||
tabletSuccReplicas.add(replica);
|
||||
} else {
|
||||
errorReplicaInfo += " replica [" + replica.getId() + "], lastFailedVersion ["
|
||||
+ replica.getLastFailedVersion() + "]";
|
||||
tabletVersionFailedReplicas.add(getReplicaInfo.apply(replica));
|
||||
tabletVersionFailedReplicas.add(replica);
|
||||
}
|
||||
} else {
|
||||
tabletWriteFailedReplicas.add(getReplicaInfo.apply(replica));
|
||||
errorBackendIdsForTablet.add(tabletBackend);
|
||||
tabletWriteFailedReplicas.add(replica);
|
||||
errorReplicaIds.add(replica.getId());
|
||||
// not remove rollup task here, because the commit maybe failed
|
||||
// remove rollup task when commit successfully
|
||||
errorReplicaInfo += " replica [" + replica.getId() + "] commitBackends null or "
|
||||
+ "tabletBackend [" + tabletBackend + "] does not "
|
||||
+ "in commitBackends";
|
||||
}
|
||||
}
|
||||
|
||||
int successReplicaNum = tabletSuccReplicas.size();
|
||||
if (successReplicaNum < quorumReplicaNum) {
|
||||
LOG.warn("Failed to commit txn [{}]. "
|
||||
+ "Tablet [{}] success replica num is {} < quorum replica num {} "
|
||||
+ "while error backends {} error replica info {} commitBackends {}",
|
||||
transactionState.getTransactionId(), tablet.getId(), successReplicaNum,
|
||||
quorumReplicaNum, Joiner.on(",").join(errorBackendIdsForTablet),
|
||||
errorReplicaInfo, commitBackends);
|
||||
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
|
||||
tabletVersionFailedReplicas);
|
||||
|
||||
String replicasDetailMsg = "";
|
||||
if (!tabletSuccReplicas.isEmpty()) {
|
||||
replicasDetailMsg += String.format("%s replicas final succ: { %s }; ",
|
||||
tabletSuccReplicas.size(), Joiner.on(", ").join(tabletSuccReplicas));
|
||||
}
|
||||
if (!tabletWriteFailedReplicas.isEmpty()) {
|
||||
replicasDetailMsg += String.format("%s replicas write data failed: { %s }; ",
|
||||
tabletWriteFailedReplicas.size(),
|
||||
Joiner.on(", ").join(tabletWriteFailedReplicas));
|
||||
}
|
||||
if (!tabletVersionFailedReplicas.isEmpty()) {
|
||||
replicasDetailMsg += String.format("%s replicas write data succ but miss previous "
|
||||
+ "version: { %s }.",
|
||||
tabletVersionFailedReplicas.size(),
|
||||
Joiner.on(", ").join(tabletVersionFailedReplicas));
|
||||
}
|
||||
String errMsg = String.format("Failed to commit txn %s, cause tablet %s succ replica "
|
||||
+ "num %s < quorum replica num %s. table %s, partition %s, this tablet detail: %s",
|
||||
transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, tableId,
|
||||
partition.getId(), writeDetail);
|
||||
LOG.info(errMsg);
|
||||
|
||||
throw new TabletQuorumFailedException(transactionId, String.format(
|
||||
"Failed to commit txn %s, cause tablet %s succ replica num %s < quorum "
|
||||
+ " replica num %s. table %s, partition %s, this tablet detail: %s",
|
||||
transactionId, tablet.getId(), successReplicaNum, quorumReplicaNum, tableId,
|
||||
partition.getId(), replicasDetailMsg));
|
||||
throw new TabletQuorumFailedException(transactionId, errMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -638,6 +590,32 @@ public class DatabaseTransactionMgr {
|
||||
}
|
||||
}
|
||||
|
||||
private String getTabletWriteDetail(List<Replica> tabletSuccReplicas, List<Replica> tabletWriteFailedReplicas,
|
||||
List<Replica> tabletVersionFailedReplicas) {
|
||||
String writeDetail = "";
|
||||
if (!tabletSuccReplicas.isEmpty()) {
|
||||
writeDetail += String.format("%s replicas final succ: { %s }; ",
|
||||
tabletSuccReplicas.size(), Joiner.on(", ").join(
|
||||
tabletSuccReplicas.stream().map(replica -> replica.toStringSimple(true))
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
if (!tabletWriteFailedReplicas.isEmpty()) {
|
||||
writeDetail += String.format("%s replicas write data failed: { %s }; ",
|
||||
tabletWriteFailedReplicas.size(), Joiner.on(", ").join(
|
||||
tabletWriteFailedReplicas.stream().map(replica -> replica.toStringSimple(true))
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
if (!tabletVersionFailedReplicas.isEmpty()) {
|
||||
writeDetail += String.format("%s replicas write data succ but miss previous "
|
||||
+ "version: { %s }.",
|
||||
tabletVersionFailedReplicas.size(), Joiner.on(",").join(
|
||||
tabletVersionFailedReplicas.stream().map(replica -> replica.toStringSimple(true))
|
||||
.collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
return writeDetail;
|
||||
}
|
||||
|
||||
/**
|
||||
* commit transaction process as follows:
|
||||
* 1. validate whether `Load` is cancelled
|
||||
@ -907,6 +885,18 @@ public class DatabaseTransactionMgr {
|
||||
errorReplicaIds.addAll(originalErrorReplicas);
|
||||
}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
long firstPublishOneSuccTime = transactionState.getFirstPublishOneSuccTime();
|
||||
boolean allowPublishOneSucc = false;
|
||||
if (Config.publish_wait_time_second > 0 && firstPublishOneSuccTime > 0
|
||||
&& now >= firstPublishOneSuccTime + Config.publish_wait_time_second * 1000L) {
|
||||
allowPublishOneSucc = true;
|
||||
}
|
||||
|
||||
List<Replica> tabletSuccReplicas = Lists.newArrayList();
|
||||
List<Replica> tabletWriteFailedReplicas = Lists.newArrayList();
|
||||
List<Replica> tabletVersionFailedReplicas = Lists.newArrayList();
|
||||
|
||||
// case 1 If database is dropped, then we just throw MetaNotFoundException, because all related tables are
|
||||
// already force dropped, we just ignore the transaction with all tables been force dropped.
|
||||
// case 2 If at least one table lock successfully, which means that the transaction should be finished for
|
||||
@ -920,8 +910,9 @@ public class DatabaseTransactionMgr {
|
||||
LOG.debug("finish transaction {} with tables {}", transactionId, tableIdList);
|
||||
List<? extends TableIf> tableList = db.getTablesOnIdOrderIfExist(tableIdList);
|
||||
tableList = MetaLockUtils.writeLockTablesIfExist(tableList);
|
||||
PublishResult publishResult = PublishResult.QUORUM_SUCC;
|
||||
try {
|
||||
boolean hasError = false;
|
||||
boolean allTabletsLeastOneSucc = true;
|
||||
Iterator<TableCommitInfo> tableCommitInfoIterator
|
||||
= transactionState.getIdToTableCommitInfos().values().iterator();
|
||||
while (tableCommitInfoIterator.hasNext()) {
|
||||
@ -985,48 +976,87 @@ public class DatabaseTransactionMgr {
|
||||
// Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
|
||||
for (MaterializedIndex index : allIndices) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
int healthReplicaNum = 0;
|
||||
tabletSuccReplicas.clear();
|
||||
tabletWriteFailedReplicas.clear();
|
||||
tabletVersionFailedReplicas.clear();
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
if (!errorReplicaIds.contains(replica.getId())) {
|
||||
if (replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
|
||||
++healthReplicaNum;
|
||||
tabletSuccReplicas.add(replica);
|
||||
} else {
|
||||
LOG.info("publish version {} failed for transaction {} on tablet {},"
|
||||
+ " on replica {} due to not catchup",
|
||||
partitionCommitInfo.getVersion(), transactionState, tablet,
|
||||
replica);
|
||||
tabletVersionFailedReplicas.add(replica);
|
||||
}
|
||||
} else if (replica.getVersion() >= partitionCommitInfo.getVersion()) {
|
||||
// the replica's version is larger than or equal to current transaction
|
||||
// partition's version the replica is normal, then remove it from error replica ids
|
||||
// TODO(cmy): actually I have no idea why we need this check
|
||||
tabletSuccReplicas.add(replica);
|
||||
errorReplicaIds.remove(replica.getId());
|
||||
++healthReplicaNum;
|
||||
} else {
|
||||
LOG.info("publish version {} failed for transaction {} on tablet {},"
|
||||
+ " on replica {} due to version hole or error",
|
||||
partitionCommitInfo.getVersion(), transactionState, tablet, replica);
|
||||
tabletWriteFailedReplicas.add(replica);
|
||||
}
|
||||
}
|
||||
|
||||
if (healthReplicaNum < quorumReplicaNum) {
|
||||
LOG.info("publish version {} failed for transaction {} on tablet {},"
|
||||
+ " with only {} replicas less than quorum {}",
|
||||
partitionCommitInfo.getVersion(), transactionState, tablet, healthReplicaNum,
|
||||
quorumReplicaNum);
|
||||
int healthReplicaNum = tabletSuccReplicas.size();
|
||||
if (healthReplicaNum >= quorumReplicaNum) {
|
||||
if (!tabletWriteFailedReplicas.isEmpty() || !tabletVersionFailedReplicas.isEmpty()) {
|
||||
String writeDetail = getTabletWriteDetail(tabletSuccReplicas,
|
||||
tabletWriteFailedReplicas, tabletVersionFailedReplicas);
|
||||
LOG.info("publish version quorum succ for transaction {} on tablet {} with version"
|
||||
+ " {}, and has failed replicas, quorum num {}. table {}, partition {},"
|
||||
+ " tablet detail: {}",
|
||||
transactionState, tablet, partitionCommitInfo.getVersion(),
|
||||
quorumReplicaNum, tableId, partitionId, writeDetail);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (healthReplicaNum == 0) {
|
||||
allTabletsLeastOneSucc = false;
|
||||
}
|
||||
|
||||
String writeDetail = getTabletWriteDetail(tabletSuccReplicas, tabletWriteFailedReplicas,
|
||||
tabletVersionFailedReplicas);
|
||||
if (allowPublishOneSucc && healthReplicaNum > 0) {
|
||||
if (publishResult == PublishResult.QUORUM_SUCC) {
|
||||
publishResult = PublishResult.TIMEOUT_SUCC;
|
||||
}
|
||||
// We can not do any thing except retrying,
|
||||
// because publish task is assigned a version,
|
||||
// and doris does not permit discontinuous
|
||||
// versions.
|
||||
//
|
||||
// If a timeout happens, it means that the rowset
|
||||
// that are being publised exists on a few replicas we should go
|
||||
// ahead, otherwise data may be lost and thre
|
||||
// publish task hangs forever.
|
||||
LOG.info("publish version timeout succ for transaction {} on tablet {} with version"
|
||||
+ " {}, and has failed replicas, quorum num {}. table {}, partition {},"
|
||||
+ " tablet detail: {}",
|
||||
transactionState, tablet, partitionCommitInfo.getVersion(), quorumReplicaNum,
|
||||
tableId, partitionId, writeDetail);
|
||||
} else {
|
||||
publishResult = PublishResult.FAILED;
|
||||
String errMsg = String.format("publish on tablet %d failed."
|
||||
+ " succeed replica num %d less than quorum %d."
|
||||
+ " table: %d, partition: %d, publish version: %d",
|
||||
tablet.getId(), healthReplicaNum, quorumReplicaNum, tableId,
|
||||
partitionId, partition.getVisibleVersion() + 1);
|
||||
transactionState.setErrorMsg(errMsg);
|
||||
hasError = true;
|
||||
LOG.info("publish version failed for transaction {} on tablet {} with version"
|
||||
+ " {}, and has failed replicas, quorum num {}. table {}, partition {},"
|
||||
+ " tablet detail: {}",
|
||||
transactionState, tablet, partitionCommitInfo.getVersion(), quorumReplicaNum,
|
||||
tableId, partitionId, writeDetail);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (hasError) {
|
||||
if (allTabletsLeastOneSucc && firstPublishOneSuccTime <= 0) {
|
||||
transactionState.setFirstPublishOneSuccTime(now);
|
||||
}
|
||||
if (publishResult == PublishResult.FAILED) {
|
||||
return;
|
||||
}
|
||||
boolean txnOperated = false;
|
||||
@ -1060,7 +1090,7 @@ public class DatabaseTransactionMgr {
|
||||
// Otherwise, there is no way for stream load to query the result right after loading finished,
|
||||
// even if we call "sync" before querying.
|
||||
transactionState.countdownVisibleLatch();
|
||||
LOG.info("finish transaction {} successfully", transactionState);
|
||||
LOG.info("finish transaction {} successfully, publish result: {}", transactionState, publishResult.name());
|
||||
}
|
||||
|
||||
protected void unprotectedPreCommitTransaction2PC(TransactionState transactionState, Set<Long> errorReplicaIds,
|
||||
|
||||
@ -219,8 +219,14 @@ public class TransactionState implements Writable {
|
||||
private long publishVersionTime = -1;
|
||||
private TransactionStatus preStatus = null;
|
||||
|
||||
// When publish txn, if every tablet has at least 1 replica published succ, but not quorum replicas succ,
|
||||
// and time since firstPublishOneSuccTime has exceeds Config.publish_wait_time_second,
|
||||
// then this transaction will become visible.
|
||||
private long firstPublishOneSuccTime = -1;
|
||||
|
||||
@SerializedName(value = "callbackId")
|
||||
private long callbackId = -1;
|
||||
|
||||
// In the beforeStateTransform() phase, we will get the callback object through the callbackId,
|
||||
// and if we get it, we will save it in this variable.
|
||||
// The main function of this variable is to retain a reference to this callback object.
|
||||
@ -387,6 +393,14 @@ public class TransactionState implements Writable {
|
||||
return errorLogUrl;
|
||||
}
|
||||
|
||||
public long getFirstPublishOneSuccTime() {
|
||||
return firstPublishOneSuccTime;
|
||||
}
|
||||
|
||||
public void setFirstPublishOneSuccTime(long firstPublishOneSuccTime) {
|
||||
this.firstPublishOneSuccTime = firstPublishOneSuccTime;
|
||||
}
|
||||
|
||||
public void setTransactionStatus(TransactionStatus transactionStatus) {
|
||||
// status changed
|
||||
this.preStatus = this.transactionStatus;
|
||||
|
||||
Reference in New Issue
Block a user