[improvement](sync version) fe sync version with be (#25236)
This commit is contained in:
@ -20,6 +20,7 @@ package org.apache.doris.catalog;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.DebugPointUtil;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
@ -114,6 +115,14 @@ public class Replica implements Writable {
|
||||
private TUniqueId cooldownMetaId;
|
||||
private long cooldownTerm = -1;
|
||||
|
||||
// A replica version should increase monotonically,
|
||||
// but backend may missing some versions due to disk failure or bugs.
|
||||
// FE should found these and mark the replica as missing versions.
|
||||
// If backend's report version < fe version, record the backend's report version as `regressiveVersion`,
|
||||
// and if time exceed 5min, fe should mark this replica as missing versions.
|
||||
private long regressiveVersion = -1;
|
||||
private long regressiveVersionTimestamp = 0;
|
||||
|
||||
/*
|
||||
* This can happen when this replica is created by a balance clone task, and
|
||||
* when task finished, the version of this replica is behind the partition's visible version.
|
||||
@ -435,9 +444,9 @@ public class Replica implements Writable {
|
||||
|
||||
if (lastFailedVersion != this.lastFailedVersion) {
|
||||
// Case 2:
|
||||
if (lastFailedVersion > this.lastFailedVersion) {
|
||||
if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) {
|
||||
this.lastFailedVersion = lastFailedVersion;
|
||||
this.lastFailedTimestamp = System.currentTimeMillis();
|
||||
this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L;
|
||||
}
|
||||
|
||||
this.lastSuccessVersion = this.version;
|
||||
@ -506,10 +515,6 @@ public class Replica implements Writable {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void setLastFailedVersion(long lastFailedVersion) {
|
||||
this.lastFailedVersion = lastFailedVersion;
|
||||
}
|
||||
|
||||
public void setState(ReplicaState replicaState) {
|
||||
this.state = replicaState;
|
||||
}
|
||||
@ -534,6 +539,25 @@ public class Replica implements Writable {
|
||||
this.versionCount = versionCount;
|
||||
}
|
||||
|
||||
public boolean checkVersionRegressive(long newVersion) {
|
||||
if (newVersion >= version) {
|
||||
regressiveVersion = -1;
|
||||
regressiveVersionTimestamp = -1;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (newVersion != regressiveVersion) {
|
||||
regressiveVersion = newVersion;
|
||||
regressiveVersionTimestamp = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder strBuffer = new StringBuilder("[replicaId=");
|
||||
|
||||
@ -390,10 +390,22 @@ public class TabletInvertedIndex {
|
||||
if (backendTabletInfo.getVersion() > versionInFe) {
|
||||
// backend replica's version is larger or newer than replica in FE, sync it.
|
||||
return true;
|
||||
} else if (versionInFe == backendTabletInfo.getVersion() && replicaInFe.isBad()) {
|
||||
} else if (versionInFe == backendTabletInfo.getVersion()) {
|
||||
// backend replica's version is equal to replica in FE, but replica in FE is bad,
|
||||
// while backend replica is good, sync it
|
||||
return true;
|
||||
if (replicaInFe.isBad()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// FE' s replica last failed version > partition's committed version
|
||||
// this can be occur when be report miss version, fe will set last failed version = visible version + 1
|
||||
// then last failed version may greater than partition's committed version
|
||||
//
|
||||
// But here cannot got variable partition, we just check lastFailedVersion = version + 1,
|
||||
// In ReportHandler.sync, we will check if last failed version > partition's committed version again.
|
||||
if (replicaInFe.getLastFailedVersion() == versionInFe + 1) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
@ -501,6 +513,12 @@ public class TabletInvertedIndex {
|
||||
// so we only return true if version_miss is true.
|
||||
return true;
|
||||
}
|
||||
|
||||
// backend versions regressive due to bugs
|
||||
if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@ -1074,6 +1074,13 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
|
||||
replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(),
|
||||
reportedTablet.getDataSize(), reportedTablet.getRowCount());
|
||||
if (replica.getLastFailedVersion() > partition.getCommittedVersion()
|
||||
&& reportedTablet.getVersion() >= partition.getCommittedVersion()
|
||||
//&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss()
|
||||
&& !(reportedTablet.isSetUsed() && !reportedTablet.isUsed())) {
|
||||
LOG.info("change replica {} of tablet {} 's last failed version to -1", replica, tabletId);
|
||||
replica.updateLastFailedVersion(-1L);
|
||||
}
|
||||
if (reportedTablet.isSetPathHash()) {
|
||||
replica.setPathHash(reportedTablet.getPathHash());
|
||||
}
|
||||
|
||||
@ -192,7 +192,7 @@ public class MasterImpl {
|
||||
finishRecoverTablet(task);
|
||||
break;
|
||||
case ALTER:
|
||||
finishAlterTask(task);
|
||||
finishAlterTask(task, request);
|
||||
break;
|
||||
case ALTER_INVERTED_INDEX:
|
||||
finishAlterInvertedIndexTask(task, request);
|
||||
@ -575,7 +575,7 @@ public class MasterImpl {
|
||||
return reportHandler.handleReport(request);
|
||||
}
|
||||
|
||||
private void finishAlterTask(AgentTask task) {
|
||||
private void finishAlterTask(AgentTask task, TFinishTaskRequest request) {
|
||||
AlterReplicaTask alterTask = (AlterReplicaTask) task;
|
||||
try {
|
||||
if (alterTask.getJobType() == JobType.ROLLUP) {
|
||||
@ -584,6 +584,11 @@ public class MasterImpl {
|
||||
Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask);
|
||||
}
|
||||
alterTask.setFinished(true);
|
||||
if (request.isSetReportVersion()) {
|
||||
long reportVersion = request.getReportVersion();
|
||||
Env.getCurrentSystemInfo().updateBackendReportVersion(
|
||||
task.getBackendId(), reportVersion, task.getDbId(), task.getTableId());
|
||||
}
|
||||
} catch (MetaNotFoundException e) {
|
||||
LOG.warn("failed to handle finish alter task: {}, {}", task.getSignature(), e.getMessage());
|
||||
}
|
||||
|
||||
@ -403,7 +403,8 @@ public class ReportHandler extends Daemon {
|
||||
}
|
||||
}
|
||||
|
||||
private static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) {
|
||||
// public for fe ut
|
||||
public static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) {
|
||||
long start = System.currentTimeMillis();
|
||||
LOG.info("backend[{}] reports {} tablet(s). report version: {}",
|
||||
backendId, backendTablets.size(), backendReportVersion);
|
||||
@ -607,6 +608,11 @@ public class ReportHandler extends Daemon {
|
||||
if (olapTable == null || !olapTable.writeLockIfExist()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (backendReportVersion < Env.getCurrentSystemInfo().getBackendReportVersion(backendId)) {
|
||||
break;
|
||||
}
|
||||
|
||||
try {
|
||||
long partitionId = tabletMeta.getPartitionId();
|
||||
Partition partition = olapTable.getPartition(partitionId);
|
||||
@ -660,14 +666,25 @@ public class ReportHandler extends Daemon {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (metaVersion < backendVersion
|
||||
|| (metaVersion == backendVersion && replica.isBad())) {
|
||||
|
||||
if (backendReportVersion < Env.getCurrentSystemInfo()
|
||||
.getBackendReportVersion(backendId)) {
|
||||
continue;
|
||||
boolean needSync = false;
|
||||
if (metaVersion < backendVersion) {
|
||||
needSync = true;
|
||||
} else if (metaVersion == backendVersion) {
|
||||
if (replica.isBad()) {
|
||||
needSync = true;
|
||||
}
|
||||
if (replica.getVersion() >= partition.getCommittedVersion()
|
||||
&& replica.getLastFailedVersion() > partition.getCommittedVersion()) {
|
||||
LOG.info("sync replica {} of tablet {} in backend {} in db {}. replica last failed"
|
||||
+ " version change to -1 because last failed version > replica's committed"
|
||||
+ " version {}",
|
||||
replica, tabletId, backendId, dbId, partition.getCommittedVersion());
|
||||
replica.updateLastFailedVersion(-1L);
|
||||
needSync = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (needSync) {
|
||||
// happens when
|
||||
// 1. PUSH finished in BE but failed or not yet report to FE
|
||||
// 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE
|
||||
@ -1048,18 +1065,25 @@ public class ReportHandler extends Daemon {
|
||||
break;
|
||||
}
|
||||
|
||||
if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) {
|
||||
if ((tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss())
|
||||
|| replica.checkVersionRegressive(tTabletInfo.getVersion())) {
|
||||
// If the origin last failed version is larger than 0, not change it.
|
||||
// Otherwise, we set last failed version to replica'version + 1.
|
||||
// Because last failed version should always larger than replica's version.
|
||||
long newLastFailedVersion = replica.getLastFailedVersion();
|
||||
if (newLastFailedVersion < 0) {
|
||||
newLastFailedVersion = replica.getVersion() + 1;
|
||||
replica.updateLastFailedVersion(newLastFailedVersion);
|
||||
LOG.warn("set missing version for replica {} of tablet {} on backend {}, "
|
||||
+ "version in fe {}, version in be {}, be missing {}",
|
||||
replica.getId(), tabletId, backendId, replica.getVersion(),
|
||||
tTabletInfo.getVersion(), tTabletInfo.isVersionMiss());
|
||||
}
|
||||
replica.updateLastFailedVersion(newLastFailedVersion);
|
||||
backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
|
||||
break;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
@ -0,0 +1,176 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.DebugPointUtil;
|
||||
import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
|
||||
import org.apache.doris.master.ReportHandler;
|
||||
import org.apache.doris.thrift.TTablet;
|
||||
import org.apache.doris.thrift.TTabletInfo;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class RepairVersionTest extends TestWithFeService {
|
||||
private class TableInfo {
|
||||
Partition partition;
|
||||
Tablet tablet;
|
||||
Replica replica;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeCreatingConnectContext() throws Exception {
|
||||
Config.enable_debug_points = true;
|
||||
Config.disable_balance = true;
|
||||
Config.disable_tablet_scheduler = true;
|
||||
Config.allow_replica_on_same_host = true;
|
||||
Config.tablet_checker_interval_ms = 100;
|
||||
Config.tablet_schedule_interval_ms = 100;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runBeforeAll() throws Exception {
|
||||
createDatabase("test");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int backendNum() {
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRepairLastFailedVersionByClone() throws Exception {
|
||||
TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_clone");
|
||||
Partition partition = info.partition;
|
||||
Replica replica = info.replica;
|
||||
|
||||
replica.updateLastFailedVersion(replica.getVersion() + 1);
|
||||
Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion());
|
||||
|
||||
Config.disable_tablet_scheduler = false;
|
||||
Thread.sleep(1000);
|
||||
Config.disable_tablet_scheduler = true;
|
||||
|
||||
Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion());
|
||||
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRepairLastFailedVersionByReport() throws Exception {
|
||||
TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_report");
|
||||
Partition partition = info.partition;
|
||||
Tablet tablet = info.tablet;
|
||||
Replica replica = info.replica;
|
||||
|
||||
replica.updateLastFailedVersion(replica.getVersion() + 1);
|
||||
Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion());
|
||||
|
||||
TTabletInfo tTabletInfo = new TTabletInfo();
|
||||
tTabletInfo.setTabletId(tablet.getId());
|
||||
tTabletInfo.setSchemaHash(replica.getSchemaHash());
|
||||
tTabletInfo.setVersion(replica.getVersion());
|
||||
tTabletInfo.setPathHash(replica.getPathHash());
|
||||
tTabletInfo.setPartitionId(partition.getId());
|
||||
tTabletInfo.setReplicaId(replica.getId());
|
||||
|
||||
TTablet tTablet = new TTablet();
|
||||
tTablet.addToTabletInfos(tTabletInfo);
|
||||
Map<Long, TTablet> tablets = Maps.newHashMap();
|
||||
tablets.put(tablet.getId(), tTablet);
|
||||
Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion());
|
||||
|
||||
ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
|
||||
|
||||
Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion());
|
||||
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testVersionRegressive() throws Exception {
|
||||
TableInfo info = prepareTableForTest("tbl_version_regressive");
|
||||
Partition partition = info.partition;
|
||||
Tablet tablet = info.tablet;
|
||||
Replica replica = info.replica;
|
||||
|
||||
Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion());
|
||||
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
|
||||
Assertions.assertTrue(replica.getVersion() > 1L);
|
||||
|
||||
TTabletInfo tTabletInfo = new TTabletInfo();
|
||||
tTabletInfo.setTabletId(tablet.getId());
|
||||
tTabletInfo.setSchemaHash(replica.getSchemaHash());
|
||||
tTabletInfo.setVersion(1L); // be report version = 1 which less than fe version
|
||||
tTabletInfo.setPathHash(replica.getPathHash());
|
||||
tTabletInfo.setPartitionId(partition.getId());
|
||||
tTabletInfo.setReplicaId(replica.getId());
|
||||
|
||||
TTablet tTablet = new TTablet();
|
||||
tTablet.addToTabletInfos(tTabletInfo);
|
||||
Map<Long, TTablet> tablets = Maps.newHashMap();
|
||||
tablets.put(tablet.getId(), tTablet);
|
||||
|
||||
ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
|
||||
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
|
||||
|
||||
DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint());
|
||||
ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
|
||||
Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion());
|
||||
|
||||
Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion());
|
||||
}
|
||||
|
||||
private TableInfo prepareTableForTest(String tableName) throws Exception {
|
||||
createTable("CREATE TABLE test." + tableName + " (k INT) DISTRIBUTED BY HASH(k) "
|
||||
+ " BUCKETS 1 PROPERTIES ( \"replication_num\" = \"2\" )");
|
||||
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test");
|
||||
OlapTable tbl = (OlapTable) db.getTableOrMetaException(tableName);
|
||||
Assertions.assertNotNull(tbl);
|
||||
Partition partition = tbl.getPartitions().iterator().next();
|
||||
Tablet tablet = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
|
||||
.getTablets().iterator().next();
|
||||
|
||||
long visibleVersion = 2L;
|
||||
partition.updateVisibleVersion(visibleVersion);
|
||||
partition.setNextVersion(visibleVersion + 1);
|
||||
tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L));
|
||||
|
||||
Replica replica = tablet.getReplicas().iterator().next();
|
||||
Assertions.assertEquals(visibleVersion, replica.getVersion());
|
||||
Assertions.assertEquals(-1L, replica.getLastFailedVersion());
|
||||
|
||||
TableInfo info = new TableInfo();
|
||||
info.partition = partition;
|
||||
info.tablet = tablet;
|
||||
info.replica = replica;
|
||||
|
||||
return info;
|
||||
}
|
||||
}
|
||||
@ -392,7 +392,7 @@ public abstract class TestWithFeService {
|
||||
InterruptedException {
|
||||
int feRpcPort = startFEServer(runningDir);
|
||||
List<Backend> bes = Lists.newArrayList();
|
||||
System.out.println("start create backend");
|
||||
System.out.println("start create backend, backend num " + backendNum);
|
||||
for (int i = 0; i < backendNum; i++) {
|
||||
bes.add(createBackend("127.0.0.1", feRpcPort));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user