[fix](catalog) fix bug that replica missing version cause query -214 error (#9266)

1. Fix bug described in #9267 
    When report missing version replica, set last failed version to (replica version + 1)
2. Skip non-exist partition when handling transactions.
This commit is contained in:
Mingyu Chen
2022-05-03 17:54:19 +08:00
committed by GitHub
parent 49a0cd1925
commit dcf5f784d8
9 changed files with 61 additions and 66 deletions

View File

@ -6975,10 +6975,7 @@ public class Catalog {
replica.setBad(true);
break;
case MISSING_VERSION:
// The absolute value is meaningless, as long as it is greater than 0.
// This way, in other checking logic, if lastFailedVersion is found to be greater than 0,
// it will be considered a version missing replica and will be handled accordingly.
replica.setLastFailedVersion(1L);
replica.updateLastFailedVersion(info.lastFailedVersion);
break;
default:
break;

View File

@ -293,7 +293,7 @@ public class Replica implements Writable {
if (newVersion < this.version) {
// This case means that replica meta version has been updated by ReportHandler before
// For example, the publish version daemon has already sent some publish verison tasks to one be to publish version 2, 3, 4, 5, 6,
// For example, the publish version daemon has already sent some publish version tasks to one be to publish version 2, 3, 4, 5, 6,
// and the be finish all publish version tasks, the be's replica version is 6 now, but publish version daemon need to wait
// for other be to finish most of publish version tasks to update replica version in fe.
// At the moment, the replica version in fe is 4, when ReportHandler sync tablet, it find reported replica version in be is 6 and then

View File

@ -18,7 +18,6 @@
package org.apache.doris.master;
import com.google.common.collect.Sets;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
@ -80,6 +79,7 @@ import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
@ -703,8 +703,8 @@ public class ReportHandler extends Daemon {
AgentTaskExecutor.submit(batchTask);
}
LOG.info("delete {} tablet(s) from backend[{}]", deleteFromBackendCounter, backendId);
LOG.info("add {} replica(s) to meta. backend[{}]", addToMetaCounter, backendId);
LOG.info("delete {} tablet(s) and add {} replica(s) to meta from backend[{}]",
deleteFromBackendCounter, addToMetaCounter, backendId);
}
// replica is used and no version missing
@ -828,11 +828,15 @@ public class ReportHandler extends Daemon {
}
if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) {
// The absolute value is meaningless, as long as it is greater than 0.
// This way, in other checking logic, if lastFailedVersion is found to be greater than 0,
// it will be considered a version missing replica and will be handled accordingly.
replica.setLastFailedVersion(1L);
backendReplicasInfo.addMissingVersionReplica(tabletId);
// If the origin last failed version is larger than 0, not change it.
// Otherwise, we set last failed version to replica'version + 1.
// Because last failed version should always larger than replica's version.
long newLastFailedVersion = replica.getLastFailedVersion();
if (newLastFailedVersion < 0) {
newLastFailedVersion = replica.getVersion() + 1;
}
replica.updateLastFailedVersion(newLastFailedVersion);
backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
break;
}
}

View File

@ -44,11 +44,11 @@ public class BackendReplicasInfo implements Writable {
}
public void addBadReplica(long tabletId) {
replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.BAD));
replicaReportInfos.add(new ReplicaReportInfo(tabletId, -1, ReportInfoType.BAD));
}
public void addMissingVersionReplica(long tabletId) {
replicaReportInfos.add(new ReplicaReportInfo(tabletId, ReportInfoType.MISSING_VERSION));
public void addMissingVersionReplica(long tabletId, long lastFailedVersion) {
replicaReportInfos.add(new ReplicaReportInfo(tabletId, lastFailedVersion, ReportInfoType.MISSING_VERSION));
}
public long getBackendId() {
@ -84,9 +84,12 @@ public class BackendReplicasInfo implements Writable {
public long tabletId;
@SerializedName(value = "type")
public ReportInfoType type;
@SerializedName(value = "lastFailedVersion")
public long lastFailedVersion;
public ReplicaReportInfo(long tabletId, ReportInfoType type) {
public ReplicaReportInfo(long tabletId, long lastFailedVersion, ReportInfoType type) {
this.tabletId = tabletId;
this.lastFailedVersion = lastFailedVersion;
this.type = type;
}
@ -98,7 +101,12 @@ public class BackendReplicasInfo implements Writable {
public static ReplicaReportInfo read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class);
ReplicaReportInfo info = GsonUtils.GSON.fromJson(json, ReplicaReportInfo.class);
if (info.type == ReportInfoType.MISSING_VERSION && info.lastFailedVersion <= 0) {
// FIXME(cmy): Just for compatibility, should be remove in v1.2
info.lastFailedVersion = 1;
}
return info;
}
}

View File

@ -569,7 +569,7 @@ public class DatabaseTransactionMgr {
TxnCommitAttachment txnCommitAttachment, Boolean is2PC)
throws UserException {
// check status
// the caller method already own db lock, we do not obtain db lock here
// the caller method already own tables' write lock
Database db = catalog.getDbOrMetaException(dbId);
TransactionState transactionState;
readLock();
@ -644,7 +644,7 @@ public class DatabaseTransactionMgr {
LOG.info("transaction:[{}] successfully committed", transactionState);
}
public boolean publishTransaction(Database db, long transactionId, long timeoutMillis) throws TransactionCommitFailedException {
public boolean waitForTransactionFinished(Database db, long transactionId, long timeoutMillis) throws TransactionCommitFailedException {
TransactionState transactionState = null;
readLock();
try {
@ -857,48 +857,24 @@ public class DatabaseTransactionMgr {
}
}
// check success replica number for each tablet.
// a success replica means:
// 1. Not in errorReplicaIds: succeed in both commit and publish phase
// 2. last failed version < 0: is a health replica before
// 3. version catch up: not with a stale version
// Here we only check number, the replica version will be updated in updateCatalogAfterVisible()
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
int healthReplicaNum = 0;
for (Replica replica : tablet.getReplicas()) {
if (!errorReplicaIds.contains(replica.getId())
&& replica.getLastFailedVersion() < 0) {
// this means the replica is a healthy replica,
// it is healthy in the past and does not have error in current load
if (!errorReplicaIds.contains(replica.getId()) && replica.getLastFailedVersion() < 0) {
if (replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
// during rollup, the rollup replica's last failed version < 0,
// it may be treated as a normal replica.
// the replica is not failed during commit or publish
// during upgrade, one replica's last version maybe invalid,
// has to compare version hash.
// Here we still update the replica's info even if we failed to publish
// this txn, for the following case:
// replica A,B,C is successfully committed, but only A is successfully
// published,
// B and C is crashed, now we need a Clone task to repair this tablet.
// So, here we update A's version info, so that clone task will clone
// the latest version of data.
replica.updateVersionInfo(partitionCommitInfo.getVersion(),
replica.getDataSize(), replica.getRowCount());
++healthReplicaNum;
} else {
// this means the replica has error in the past, but we did not observe it
// during upgrade, one job maybe in quorum finished state, for example, A,B,C 3 replica
// A,B 's version is 10, C's version is 10 but C' 10 is abnormal should be rollback
// then we will detect this and set C's last failed version to 10 and last success version to 11
// this logic has to be replayed in checkpoint thread
replica.updateVersionInfo(replica.getVersion(),
partition.getVisibleVersion(),
partitionCommitInfo.getVersion());
LOG.warn("transaction state {} has error, the replica [{}] not appeared in error replica list "
+ " and its version not equal to partition commit version or commit version - 1"
+ " if its not a upgrade stage, its a fatal error. ", transactionState, replica);
}
} else if (replica.getVersion() >= partitionCommitInfo.getVersion()) {
// the replica's version is larger than or equal to current transaction partition's version
// the replica is normal, then remove it from error replica ids
// TODO(cmy): actually I have no idea why we need this check
errorReplicaIds.remove(replica.getId());
++healthReplicaNum;
}
@ -1490,14 +1466,19 @@ public class DatabaseTransactionMgr {
for (PartitionCommitInfo partitionCommitInfo : tableCommitInfo.getIdToPartitionCommitInfo().values()) {
long partitionId = partitionCommitInfo.getPartitionId();
Partition partition = table.getPartition(partitionId);
if (partition == null) {
LOG.warn("partition {} of table {} does not exist when update catalog after committed. transaction: {}, db: {}",
partitionId, tableId, transactionState.getTransactionId(), db.getId());
continue;
}
List<MaterializedIndex> allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
for (MaterializedIndex index : allIndices) {
List<Tablet> tablets = index.getTablets();
for (Tablet tablet : tablets) {
for (Replica replica : tablet.getReplicas()) {
if (errorReplicaIds.contains(replica.getId())) {
// should not use partition.getNextVersion and partition.getNextVersionHash because partition's next version hash is generated locally
// should get from transaction state
// TODO(cmy): do we need to update last failed version here?
// because in updateCatalogAfterVisible, it will be updated again.
replica.updateLastFailedVersion(partitionCommitInfo.getVersion());
}
}
@ -1522,6 +1503,11 @@ public class DatabaseTransactionMgr {
long partitionId = partitionCommitInfo.getPartitionId();
long newCommitVersion = partitionCommitInfo.getVersion();
Partition partition = table.getPartition(partitionId);
if (partition == null) {
LOG.warn("partition {} in table {} does not exist when update catalog after visible. transaction: {}, db: {}",
partitionId, tableId, transactionState.getTransactionId(), db.getId());
continue;
}
List<MaterializedIndex> allIndices = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL);
for (MaterializedIndex index : allIndices) {
for (Tablet tablet : index.getTablets()) {
@ -1531,7 +1517,7 @@ public class DatabaseTransactionMgr {
long lastSuccessVersion = replica.getLastSuccessVersion();
if (!errorReplicaIds.contains(replica.getId())) {
if (replica.getLastFailedVersion() > 0) {
// if the replica is a failed replica, then not changing version and version hash
// if the replica is a failed replica, then not changing version
newVersion = replica.getVersion();
} else if (!replica.checkVersionCatchUp(partition.getVisibleVersion(), true)) {
// this means the replica has error in the past, but we did not observe it

View File

@ -211,7 +211,7 @@ public class GlobalTransactionMgr implements Writable {
* @throws UserException
* @throws TransactionCommitFailedException
* @note it is necessary to optimize the `lock` mechanism and `lock` scope resulting from wait lock long time
* @note callers should get db.write lock before call this api
* @note callers should get all tables' write locks before call this api
*/
public void commitTransaction(long dbId, List<Table> tableList, long transactionId, List<TabletCommitInfo> tabletCommitInfos,
TxnCommitAttachment txnCommitAttachment)
@ -263,7 +263,7 @@ public class GlobalTransactionMgr implements Writable {
// so we just return false to indicate publish timeout
return false;
}
return dbTransactionMgr.publishTransaction(db, transactionId, publishTimeoutMillis);
return dbTransactionMgr.waitForTransactionFinished(db, transactionId, publishTimeoutMillis);
}
public void commitTransaction2PC(Database db, List<Table> tableList, long transactionId, long timeoutMillis)

View File

@ -79,10 +79,7 @@ public class PublishVersionDaemon extends MasterDaemon {
return;
}
// TODO yiguolei: could publish transaction state according to multi-tenant cluster info
// but should do more work. for example, if a table is migrate from one cluster to another cluster
// should publish to two clusters.
// attention here, we publish transaction state to all backends including dead backend, if not publish to dead backend
// ATTN, we publish transaction state to all backends including dead backend, if not publish to dead backend
// then transaction manager will treat it as success
List<Long> allBackends = Catalog.getCurrentSystemInfo().getBackendIds(false);
if (allBackends.isEmpty()) {
@ -198,7 +195,6 @@ public class PublishVersionDaemon extends MasterDaemon {
continue;
}
for (long tableId : transactionState.getTableIdList()) {
Table table = db.getTableNullable(tableId);
if (table == null || table.getType() != Table.TableType.OLAP) {

View File

@ -49,7 +49,7 @@ public class BackendReplicaInfosTest {
BackendReplicasInfo info = new BackendReplicasInfo(beId);
info.addBadReplica(tabletId1);
info.addMissingVersionReplica(tabletId2);
info.addMissingVersionReplica(tabletId2, 11);
checkInfo(info);
info.write(dos);
dos.flush();
@ -73,6 +73,7 @@ public class BackendReplicaInfosTest {
Assert.assertEquals(BackendReplicasInfo.ReportInfoType.BAD, reportInfo.type);
} else if (reportInfo.tabletId == tabletId2) {
Assert.assertEquals(BackendReplicasInfo.ReportInfoType.MISSING_VERSION, reportInfo.type);
Assert.assertEquals(11, reportInfo.lastFailedVersion);
} else {
Assert.fail("unknown tablet id: " + reportInfo.tabletId);
}

View File

@ -67,12 +67,13 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import mockit.Injectable;
import mockit.Mocked;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import mockit.Injectable;
import mockit.Mocked;
public class GlobalTransactionMgrTest {
private static FakeEditLog fakeEditLog;
@ -526,7 +527,9 @@ public class GlobalTransactionMgrTest {
Replica replica1 = tablet.getReplicaById(CatalogTestUtil.testReplicaId1);
Replica replica2 = tablet.getReplicaById(CatalogTestUtil.testReplicaId2);
Replica replica3 = tablet.getReplicaById(CatalogTestUtil.testReplicaId3);
assertEquals(CatalogTestUtil.testStartVersion + 1, replica1.getVersion());
// because after calling `finishTransaction`, the txn state is COMMITTED, not VISIBLE,
// so all replicas' version are not changed.
assertEquals(CatalogTestUtil.testStartVersion, replica1.getVersion());
assertEquals(CatalogTestUtil.testStartVersion, replica2.getVersion());
assertEquals(CatalogTestUtil.testStartVersion, replica3.getVersion());
assertEquals(-1, replica1.getLastFailedVersion());