Fix some bugs (#566)
1. Backup obj should set state to NORMAL. 2. Replica with version 1-0 should be handled correctly.
This commit is contained in:
@ -1086,7 +1086,7 @@ public class SchemaChangeJob extends AlterJob {
|
||||
|
||||
@Override
|
||||
public void getJobInfo(List<List<Comparable>> jobInfos, OlapTable tbl) {
|
||||
if (changedIndexIdToSchema == null) {
|
||||
if (changedIndexIdToSchemaVersion == null) {
|
||||
// for compatibility
|
||||
if (state == JobState.FINISHED || state == JobState.CANCELLED) {
|
||||
List<Comparable> jobInfo = new ArrayList<Comparable>();
|
||||
|
||||
@ -298,7 +298,7 @@ public class BackupHandler extends Daemon implements Writable {
|
||||
}
|
||||
|
||||
// copy a table with selected partitions for calculating the signature
|
||||
OlapTable copiedTbl = olapTbl.selectiveCopy(tblRef.getPartitions());
|
||||
OlapTable copiedTbl = olapTbl.selectiveCopy(tblRef.getPartitions(), true);
|
||||
if (copiedTbl == null) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
|
||||
"Failed to copy table " + tblName + " with selected partitions");
|
||||
|
||||
@ -418,7 +418,7 @@ public class BackupJob extends AbstractJob {
|
||||
for (TableRef tableRef : tableRefs) {
|
||||
String tblName = tableRef.getName().getTbl();
|
||||
OlapTable tbl = (OlapTable) db.getTable(tblName);
|
||||
OlapTable copiedTbl = tbl.selectiveCopy(tableRef.getPartitions());
|
||||
OlapTable copiedTbl = tbl.selectiveCopy(tableRef.getPartitions(), true);
|
||||
if (copiedTbl == null) {
|
||||
status = new Status(ErrCode.COMMON_ERROR, "faild to copy table: " + tblName);
|
||||
return;
|
||||
|
||||
@ -5812,7 +5812,7 @@ public class Catalog {
|
||||
}
|
||||
}
|
||||
|
||||
copiedTbl = olapTable.selectiveCopy(origPartitions.keySet());
|
||||
copiedTbl = olapTable.selectiveCopy(origPartitions.keySet(), true);
|
||||
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
|
||||
@ -30,6 +30,8 @@ import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.backup.Status;
|
||||
import org.apache.doris.backup.Status.ErrCode;
|
||||
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
|
||||
import org.apache.doris.catalog.MaterializedIndex.IndexState;
|
||||
import org.apache.doris.catalog.Partition.PartitionState;
|
||||
import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.catalog.Tablet.TabletStatus;
|
||||
import org.apache.doris.clone.TabletSchedCtx;
|
||||
@ -893,13 +895,28 @@ public class OlapTable extends Table {
|
||||
return true;
|
||||
}
|
||||
|
||||
public OlapTable selectiveCopy(Collection<String> reservedPartNames) {
|
||||
public OlapTable selectiveCopy(Collection<String> reservedPartNames, boolean resetState) {
|
||||
OlapTable copied = new OlapTable();
|
||||
if (!DeepCopy.copy(this, copied)) {
|
||||
LOG.warn("failed to copy olap table: " + getName());
|
||||
return null;
|
||||
}
|
||||
|
||||
if (resetState) {
|
||||
copied.setState(OlapTableState.NORMAL);
|
||||
for (Partition partition : copied.getPartitions()) {
|
||||
partition.setState(PartitionState.NORMAL);
|
||||
for (MaterializedIndex idx : partition.getMaterializedIndices()) {
|
||||
idx.setState(IndexState.NORMAL);
|
||||
for (Tablet tablet : idx.getTablets()) {
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
replica.setState(ReplicaState.NORMAL);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (reservedPartNames == null || reservedPartNames.isEmpty()) {
|
||||
// reserve all
|
||||
return copied;
|
||||
|
||||
@ -203,7 +203,7 @@ public class Replica implements Writable {
|
||||
|
||||
LOG.warn("update replica {} on backend {}'s version for recovery. version: {}-{}:{}-{}."
|
||||
+ " last failed version: {}-{}:{}-{}, last success version: {}-{}:{}-{}",
|
||||
this.version, this.versionHash, newVersion, newVersionHash,
|
||||
this.id, this.backendId, this.version, this.versionHash, newVersion, newVersionHash,
|
||||
this.lastFailedVersion, this.lastFailedVersionHash, lastFailedVersion, lastFailedVersionHash,
|
||||
this.lastSuccessVersion, this.lastSuccessVersionHash, lastSuccessVersion, lastSuccessVersionHash);
|
||||
|
||||
@ -315,17 +315,17 @@ public class Replica implements Writable {
|
||||
this.lastSuccessVersion, this.lastSuccessVersionHash, dataSize, rowCount);
|
||||
}
|
||||
|
||||
public boolean checkVersionCatchUp(long committedVersion, long committedVersionHash) {
|
||||
if (committedVersion == Partition.PARTITION_INIT_VERSION
|
||||
&& committedVersionHash == Partition.PARTITION_INIT_VERSION_HASH) {
|
||||
public boolean checkVersionCatchUp(long expectedVersion, long expectedVersionHash) {
|
||||
if (expectedVersion == Partition.PARTITION_INIT_VERSION
|
||||
&& expectedVersionHash == Partition.PARTITION_INIT_VERSION_HASH) {
|
||||
// no data is loaded into this replica, just return true
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.version < committedVersion
|
||||
|| (this.version == committedVersion && this.versionHash != committedVersionHash)) {
|
||||
if (this.version < expectedVersion
|
||||
|| (this.version == expectedVersion && this.versionHash != expectedVersionHash)) {
|
||||
LOG.debug("replica version does not catch up with version: {}-{}. replica: {}",
|
||||
committedVersion, committedVersionHash, this);
|
||||
expectedVersion, expectedVersionHash, this);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
@ -371,6 +371,8 @@ public class Replica implements Writable {
|
||||
strBuffer.append(lastSuccessVersionHash);
|
||||
strBuffer.append(", lastFailedTimestamp=");
|
||||
strBuffer.append(lastFailedTimestamp);
|
||||
strBuffer.append(", schemaHash");
|
||||
strBuffer.append(schemaHash);
|
||||
return strBuffer.toString();
|
||||
}
|
||||
|
||||
|
||||
@ -170,12 +170,12 @@ public class Tablet extends MetaObject implements Writable {
|
||||
|
||||
// for query
|
||||
public void getQueryableReplicas(List<Replica> allQuerableReplica, List<Replica> localReplicas,
|
||||
long committedVersion, long committedVersionHash, long localBeId, int schemaHash) {
|
||||
long visibleVersion, long visibleVersionHash, long localBeId, int schemaHash) {
|
||||
for (Replica replica : replicas) {
|
||||
ReplicaState state = replica.getState();
|
||||
if (state == ReplicaState.NORMAL || state == ReplicaState.SCHEMA_CHANGE) {
|
||||
// replica.getSchemaHash() == -1 is for compatibility
|
||||
if (replica.checkVersionCatchUp(committedVersion, committedVersionHash)
|
||||
if (replica.checkVersionCatchUp(visibleVersion, visibleVersionHash)
|
||||
&& (replica.getSchemaHash() == -1 || replica.getSchemaHash() == schemaHash)) {
|
||||
allQuerableReplica.add(replica);
|
||||
if (localBeId != -1 && replica.getBackendId() == localBeId) {
|
||||
|
||||
@ -318,9 +318,23 @@ public class TabletInvertedIndex {
|
||||
* if be's report version < fe's meta version, it means some version is missing in BE
|
||||
* because of some unrecoverable failure.
|
||||
*/
|
||||
private boolean checkNeedRecover(Replica replicaMeta, long backendVersion, long backendVersionHash) {
|
||||
long metaVersion = replicaMeta.getVersion();
|
||||
if (metaVersion > backendVersion) {
|
||||
private boolean checkNeedRecover(Replica replicaInFe, long backendVersion, long backendVersionHash) {
|
||||
if (replicaInFe.getVersion() == 2 && replicaInFe.getVersionHash() == 0
|
||||
&& backendVersion == 1 && backendVersionHash == 0) {
|
||||
/*
|
||||
* This is very tricky:
|
||||
* 1. The newly created replica in FE is with version 1-0, but the new replica is BE is 2-0
|
||||
* 2. After the first tablet report, replica in FE with be sync with BE, update its version to 2-0
|
||||
* 3. A snapshot of replica with version 2-0 on BE is 1-0 (Because we send snapshot task with
|
||||
* partition's version, which is 1-0)
|
||||
* 4. And BE will report version 1-0, but in FE, its 2-0, so we fall into here.
|
||||
*
|
||||
* So here we ignore this kind of report
|
||||
*/
|
||||
return false;
|
||||
}
|
||||
|
||||
if (backendVersion < replicaInFe.getVersion()) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
||||
@ -1119,18 +1119,20 @@ public class TabletScheduler extends Daemon {
|
||||
return num;
|
||||
}
|
||||
|
||||
public synchronized List<String> getSlotInfo(long beId) {
|
||||
List<String> result = Lists.newArrayList();
|
||||
public synchronized List<List<String>> getSlotInfo(long beId) {
|
||||
List<List<String>> results = Lists.newArrayList();
|
||||
pathSlots.entrySet().stream().forEach(t -> {
|
||||
t.getValue().rectify();
|
||||
List<String> result = Lists.newArrayList();
|
||||
result.add(String.valueOf(beId));
|
||||
result.add(String.valueOf(t.getKey()));
|
||||
result.add(String.valueOf(t.getValue().available));
|
||||
result.add(String.valueOf(t.getValue().total));
|
||||
result.add(String.valueOf(t.getValue().balanceSlot));
|
||||
result.add(String.valueOf(t.getValue().getAvgRate()));
|
||||
results.add(result);
|
||||
});
|
||||
return result;
|
||||
return results;
|
||||
}
|
||||
|
||||
public synchronized long takeBalanceSlot(long pathHash) {
|
||||
@ -1173,7 +1175,7 @@ public class TabletScheduler extends Daemon {
|
||||
List<List<String>> result = Lists.newArrayList();
|
||||
for (long beId : backendsWorkingSlots.keySet()) {
|
||||
PathSlot slot = backendsWorkingSlots.get(beId);
|
||||
result.add(slot.getSlotInfo(beId));
|
||||
result.addAll(slot.getSlotInfo(beId));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -441,11 +441,15 @@ public class OlapScanNode extends ScanNode {
|
||||
List<Replica> allQueryableReplicas = Lists.newArrayList();
|
||||
List<Replica> localReplicas = Lists.newArrayList();
|
||||
tablet.getQueryableReplicas(allQueryableReplicas, localReplicas,
|
||||
visibleVersion, visibleVersionHash,
|
||||
localBeId, schemaHash);
|
||||
visibleVersion, visibleVersionHash, localBeId, schemaHash);
|
||||
if (allQueryableReplicas.isEmpty()) {
|
||||
LOG.error("no queryable replica found in tablet[{}]. committed version[{}], committed version hash[{}]",
|
||||
LOG.error("no queryable replica found in tablet {}. visible version {}-{}",
|
||||
tabletId, visibleVersion, visibleVersionHash);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
LOG.debug("tablet {}, replica: {}", tabletId, replica.toString());
|
||||
}
|
||||
}
|
||||
throw new UserException("Failed to get scan range, no queryable replica found in tablet: " + tabletId);
|
||||
}
|
||||
|
||||
|
||||
@ -140,7 +140,6 @@ public class ReplicaTest {
|
||||
// 3. Check equal
|
||||
for (int i = 0; i < 11; i++) {
|
||||
Assert.assertTrue(list1.get(i).equals(list2.get(i)));
|
||||
Assert.assertTrue(list1.get(i).toString().equals(list2.get(i).toString()));
|
||||
}
|
||||
|
||||
Assert.assertTrue(list1.get(1).equals(list1.get(1)));
|
||||
|
||||
Reference in New Issue
Block a user