Delete deprecated code in Frontend (#1463)

1. Delete Clone/CloneJob/CloneChecker
    The old clone framework is deprecated, using TabletChecker/TabletScheduler instead
2. Delete old BackupJob/RestoreJob
3. Delete OP_DROP_USER edit log
4. Delete CLONE_DONE edit log
This commit is contained in:
Mingyu Chen
2019-07-12 13:34:05 +08:00
committed by GitHub
parent 734032d917
commit 863eb83cb1
18 changed files with 15 additions and 5006 deletions

View File

@ -40,7 +40,6 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.Clone;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
@ -474,17 +473,10 @@ public class RollupHandler extends AlterHandler {
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
AgentBatchTask batchTask = new AgentBatchTask();
final String cloneFailMsg = "rollup index[" + rollupIndexName + "] has been dropped";
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.getIndex(rollupIndexId);
Preconditions.checkNotNull(rollupIndex);
// 1. remove clone job
Clone clone = Catalog.getInstance().getCloneInstance();
for (Tablet tablet : rollupIndex.getTablets()) {
clone.cancelCloneJob(tablet.getId(), cloneFailMsg);
}
// 2. delete rollup index
partition.deleteRollupIndex(rollupIndexId);

View File

@ -1,245 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Deprecated
public class AbstractBackupJob_D implements Writable {
private static final Logger LOG = LogManager.getLogger(AbstractBackupJob_D.class);
protected long jobId;
protected long dbId;
protected LabelName labelName;
protected String errMsg;
protected String remotePath;
protected Map<String, String> remoteProperties;
// unfinished tabletId (reuse for snapshot, upload)
// tabletId -> backendId
protected Multimap<Long, Long> unfinishedTabletIds;
protected long createTime;
protected long finishedTime;
protected PathBuilder pathBuilder;
protected CommandBuilder commandBuilder;
protected Future<String> future;
public AbstractBackupJob_D() {
unfinishedTabletIds = HashMultimap.create();
}
public AbstractBackupJob_D(long jobId, long dbId, LabelName labelName, String remotePath,
Map<String, String> remoteProperties) {
this.jobId = jobId;
this.dbId = dbId;
this.labelName = labelName;
this.remotePath = remotePath;
this.remoteProperties = remoteProperties;
this.unfinishedTabletIds = HashMultimap.create();
this.errMsg = "";
this.createTime = System.currentTimeMillis();
this.finishedTime = -1L;
}
public long getJobId() {
return jobId;
}
public long getDbId() {
return dbId;
}
public String getDbName() {
return labelName.getDbName();
}
public String getLabel() {
return labelName.getLabelName();
}
public String getLocalDirName() {
return getDbName() + "_" + getLabel();
}
public void setErrMsg(String errMsg) {
this.errMsg = errMsg;
}
public String getErrMsg() {
return errMsg;
}
public String getRemotePath() {
return remotePath;
}
public Map<String, String> getRemoteProperties() {
return remoteProperties;
}
public long getCreateTime() {
return createTime;
}
public long getFinishedTime() {
return finishedTime;
}
public List<Comparable> getJobInfo() {
throw new NotImplementedException();
}
public synchronized int getLeftTasksNum() {
if (unfinishedTabletIds != null) {
return unfinishedTabletIds.size();
} else {
return -1;
}
}
public void end(Catalog catalog, boolean isReplay) {
throw new NotImplementedException();
}
public void runOnce() {
throw new NotImplementedException();
}
public synchronized List<List<Comparable>> getUnfinishedInfos() {
List<List<Comparable>> infos = Lists.newArrayList();
if (unfinishedTabletIds != null) {
for (Long tabletId : unfinishedTabletIds.keySet()) {
Collection<Long> backendIds = unfinishedTabletIds.get(tabletId);
List<Comparable> info = Lists.newArrayList();
info.add(tabletId);
info.add(Joiner.on(",").join(backendIds));
infos.add(info);
}
}
return infos;
}
protected void clearJob() {
throw new NotImplementedException();
}
protected boolean checkFuture(String msg) throws InterruptedException, ExecutionException, IOException {
if (!future.isDone()) {
LOG.info("waiting {} finished. job: {}", msg, jobId);
return false;
} else {
String errMsg = future.get();
// reset future;
future = null;
if (errMsg != null) {
throw new IOException("failed to " + msg + " [" + errMsg + "]. backup job[" + jobId + "]");
} else {
LOG.info("{} finished. job: {}", msg, jobId);
return true;
}
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(jobId);
out.writeLong(dbId);
labelName.write(out);
Text.writeString(out, errMsg);
Text.writeString(out, remotePath);
if (remoteProperties == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = remoteProperties.size();
out.writeInt(size);
for (Map.Entry<String, String> entry : remoteProperties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
out.writeLong(createTime);
out.writeLong(finishedTime);
if (pathBuilder == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
pathBuilder.write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
jobId = in.readLong();
dbId = in.readLong();
labelName = new LabelName();
labelName.readFields(in);
errMsg = Text.readString(in);
remotePath = Text.readString(in);
if (in.readBoolean()) {
remoteProperties = Maps.newHashMap();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String value = Text.readString(in);
remoteProperties.put(key, value);
}
}
createTime = in.readLong();
finishedTime = in.readLong();
if (in.readBoolean()) {
pathBuilder = new PathBuilder();
pathBuilder.readFields(in);
}
}
}

View File

@ -1,945 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.DeleteInfo;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadJob;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.ReleaseSnapshotTask;
import org.apache.doris.task.SnapshotTask;
import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@Deprecated
public class BackupJob_D extends AbstractBackupJob_D {
private static final Logger LOG = LogManager.getLogger(BackupJob_D.class);
private static final long SNAPSHOT_TIMEOUT_MS = 2000; // 1s for one tablet
public enum BackupJobState {
PENDING,
SNAPSHOT,
UPLOAD,
UPLOADING,
FINISHING,
FINISHED,
CANCELLED
}
private BackupJobState state;
private String lastestLoadLabel;
private DeleteInfo lastestDeleteInfo;
// all partitions need to be backuped
private Map<Long, Set<Long>> tableIdToPartitionIds;
private Multimap<Long, Long> tableIdToIndexIds;
// partition id -> (version, version hash)
private Map<Long, Pair<Long, Long>> partitionIdToVersionInfo;
private Map<Long, Pair<Long, String>> tabletIdToSnapshotPath;
private long metaSavedTime;
private long snapshotFinishedTime;
private long uploadFinishedTime;
private long phasedTimeoutMs;
private String readableManifestPath;
public BackupJob_D() {
super();
tableIdToPartitionIds = Maps.newHashMap();
tableIdToIndexIds = HashMultimap.create();
partitionIdToVersionInfo = Maps.newHashMap();
tabletIdToSnapshotPath = Maps.newHashMap();
}
public BackupJob_D(long jobId, long dbId, LabelName labelName, String backupPath,
Map<String, String> remoteProperties) {
super(jobId, dbId, labelName, backupPath, remoteProperties);
this.state = BackupJobState.PENDING;
tableIdToPartitionIds = Maps.newHashMap();
tableIdToIndexIds = HashMultimap.create();
partitionIdToVersionInfo = Maps.newHashMap();
tabletIdToSnapshotPath = Maps.newHashMap();
metaSavedTime = -1;
snapshotFinishedTime = -1;
uploadFinishedTime = -1;
phasedTimeoutMs = -1;
lastestLoadLabel = "N/A";
readableManifestPath = "";
}
public void setState(BackupJobState state) {
this.state = state;
}
public BackupJobState getState() {
return state;
}
public String getLatestLoadLabel() {
return lastestLoadLabel;
}
public DeleteInfo getLastestDeleteInfo() {
return lastestDeleteInfo;
}
public PathBuilder getPathBuilder() {
return pathBuilder;
}
public long getMetaSavedTimeMs() {
return metaSavedTime;
}
public long getSnapshotFinishedTimeMs() {
return snapshotFinishedTime;
}
public long getUploadFinishedTimeMs() {
return uploadFinishedTime;
}
public String getReadableManifestPath() {
return readableManifestPath;
}
public Map<Long, Set<Long>> getTableIdToPartitionIds() {
return tableIdToPartitionIds;
}
public void addPartitionId(long tableId, long partitionId) {
Set<Long> partitionIds = tableIdToPartitionIds.get(tableId);
if (partitionIds == null) {
partitionIds = Sets.newHashSet();
tableIdToPartitionIds.put(tableId, partitionIds);
}
if (partitionId != -1L) {
partitionIds.add(partitionId);
}
LOG.debug("add partition[{}] from table[{}], job[{}]", partitionId, tableId, jobId);
}
public void addIndexId(long tableId, long indexId) {
tableIdToIndexIds.put(tableId, indexId);
LOG.debug("add index[{}] from table[{}], job[{}]", indexId, tableId, jobId);
}
public void handleFinishedSnapshot(long tabletId, long backendId, String snapshotPath) {
synchronized (unfinishedTabletIds) {
if (!unfinishedTabletIds.containsKey(tabletId)) {
LOG.warn("backup job[{}] does not contains tablet[{}]", jobId, tabletId);
return;
}
if (unfinishedTabletIds.get(tabletId) == null
|| !unfinishedTabletIds.get(tabletId).contains(backendId)) {
LOG.warn("backup job[{}] does not contains tablet[{}]'s snapshot from backend[{}]. "
+ "it should from backend[{}]",
jobId, tabletId, backendId, unfinishedTabletIds.get(tabletId));
return;
}
unfinishedTabletIds.remove(tabletId, backendId);
}
synchronized (tabletIdToSnapshotPath) {
tabletIdToSnapshotPath.put(tabletId, new Pair<Long, String>(backendId, snapshotPath));
}
LOG.debug("finished add tablet[{}] from backend[{}]. snapshot path: {}", tabletId, backendId, snapshotPath);
}
public void handleFinishedUpload(long tabletId, long backendId) {
synchronized (unfinishedTabletIds) {
if (unfinishedTabletIds.remove(tabletId, backendId)) {
LOG.debug("finished upload tablet[{}] snapshot, backend[{}]", tabletId, backendId);
}
}
}
@Override
public List<Comparable> getJobInfo() {
List<Comparable> jobInfo = Lists.newArrayList();
jobInfo.add(jobId);
jobInfo.add(getLabel());
jobInfo.add(state.name());
jobInfo.add(TimeUtils.longToTimeString(createTime));
jobInfo.add(TimeUtils.longToTimeString(metaSavedTime));
jobInfo.add(TimeUtils.longToTimeString(snapshotFinishedTime));
jobInfo.add(TimeUtils.longToTimeString(uploadFinishedTime));
jobInfo.add(TimeUtils.longToTimeString(finishedTime));
jobInfo.add(errMsg);
jobInfo.add(PathBuilder.createPath(remotePath, getLabel()));
jobInfo.add(getReadableManifestPath());
jobInfo.add(getLeftTasksNum());
jobInfo.add(getLatestLoadLabel());
return jobInfo;
}
@Override
public void runOnce() {
LOG.debug("begin to run backup job: {}, state: {}", jobId, state.name());
try {
switch (state) {
case PENDING:
saveMetaAndMakeSnapshot();
break;
case SNAPSHOT:
waitSnapshot();
break;
case UPLOAD:
upload();
break;
case UPLOADING:
waitUpload();
break;
case FINISHING:
finishing();
break;
default:
break;
}
} catch (Exception e) {
errMsg = e.getMessage() == null ? "Unknown Exception" : e.getMessage();
LOG.warn("failed to backup: " + errMsg + ", job[" + jobId + "]", e);
state = BackupJobState.CANCELLED;
}
if (state == BackupJobState.FINISHED || state == BackupJobState.CANCELLED) {
end(Catalog.getInstance(), false);
}
}
private void saveMetaAndMakeSnapshot() throws DdlException, IOException {
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
throw new DdlException("[" + getDbName() + "] does not exist");
}
try {
pathBuilder = PathBuilder.createPathBuilder(getLocalDirName());
} catch (IOException e) {
pathBuilder = null;
throw e;
}
// file path -> writable objs
Map<String, List<? extends Writable>> pathToWritables = Maps.newHashMap();
// 1. get meta
getMeta(db, pathToWritables);
// 2. write meta
// IO ops should be done outside db.lock
try {
writeMeta(pathToWritables);
} catch (IOException e) {
errMsg = e.getMessage();
state = BackupJobState.CANCELLED;
return;
}
metaSavedTime = System.currentTimeMillis();
LOG.info("save meta finished. path: {}, job: {}", pathBuilder.getRoot().getFullPath(), jobId);
// 3. send snapshot tasks
snapshot(db);
}
private void getMeta(Database db, Map<String, List<? extends Writable>> pathToWritables) throws DdlException {
db.readLock();
try {
for (long tableId : tableIdToPartitionIds.keySet()) {
Table table = db.getTable(tableId);
if (table == null) {
throw new DdlException("table[" + tableId + "] does not exist");
}
// 1. get table meta
getTableMeta(db.getFullName(), table, pathToWritables);
if (table.getType() != TableType.OLAP) {
// this is not a OLAP table. just save table meta
continue;
}
OlapTable olapTable = (OlapTable) table;
// 2. get rollup meta
// 2.1 check all indices exist
for (Long indexId : tableIdToIndexIds.get(tableId)) {
if (olapTable.getIndexNameById(indexId) == null) {
errMsg = "Index[" + indexId + "] does not exist";
state = BackupJobState.CANCELLED;
return;
}
}
getRollupMeta(db.getFullName(), olapTable, pathToWritables);
// 3. save partition meta
Collection<Long> partitionIds = tableIdToPartitionIds.get(tableId);
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE) {
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
List<Map.Entry<Long, Range<PartitionKey>>> rangeMap = rangePartitionInfo.getSortedRangeMap();
for (Map.Entry<Long, Range<PartitionKey>> entry : rangeMap) {
long partitionId = entry.getKey();
if (!partitionIds.contains(partitionId)) {
continue;
}
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
throw new DdlException("partition[" + partitionId + "] does not exist");
}
getPartitionMeta(db.getFullName(), olapTable, partition.getName(), pathToWritables);
// save version info
partitionIdToVersionInfo.put(partitionId,
new Pair<Long, Long>(partition.getVisibleVersion(),
partition.getVisibleVersionHash()));
}
} else {
Preconditions.checkState(partitionIds.size() == 1);
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
// save version info
partitionIdToVersionInfo.put(partitionId,
new Pair<Long, Long>(partition.getVisibleVersion(),
partition.getVisibleVersionHash()));
}
}
} // end for tables
// get last finished load job and delele job label
Load load = Catalog.getInstance().getLoadInstance();
LoadJob lastestLoadJob = load.getLastestFinishedLoadJob(dbId);
if (lastestLoadJob == null) {
// there is no load job, or job info has been removed
lastestLoadLabel = "N/A";
} else {
lastestLoadLabel = lastestLoadJob.getLabel();
}
LOG.info("get lastest load job label: {}, job: {}", lastestLoadJob, jobId);
lastestDeleteInfo = load.getLastestFinishedDeleteInfo(dbId);
LOG.info("get lastest delete info: {}, job: {}", lastestDeleteInfo, jobId);
LOG.info("get meta finished. job[{}]", jobId);
} finally {
db.readUnlock();
}
}
private void getTableMeta(String dbName, Table table, Map<String, List<? extends Writable>> pathToWritables) {
CreateTableStmt stmt = table.toCreateTableStmt(dbName);
int tableSignature = table.getSignature(BackupVersion.VERSION_1);
stmt.setTableSignature(tableSignature);
List<CreateTableStmt> stmts = Lists.newArrayList(stmt);
String filePath = pathBuilder.createTableStmt(dbName, table.getName());
Preconditions.checkState(!pathToWritables.containsKey(filePath));
throw new RuntimeException("Don't support CreateTableStmt serialization.");
// pathToWritables.put(filePath, stmts);
}
private void getRollupMeta(String dbName, OlapTable olapTable,
Map<String, List<? extends Writable>> pathToWritables) {
Set<Long> indexIds = Sets.newHashSet(tableIdToIndexIds.get(olapTable.getId()));
if (indexIds.size() == 1) {
// only contains base index. do nothing
return;
} else {
// remove base index id
Preconditions.checkState(indexIds.size() > 1);
indexIds.remove(olapTable.getId());
}
AlterTableStmt stmt = olapTable.toAddRollupStmt(dbName, indexIds);
String filePath = pathBuilder.addRollupStmt(dbName, olapTable.getName());
List<AlterTableStmt> stmts = Lists.newArrayList(stmt);
Preconditions.checkState(!pathToWritables.containsKey(filePath));
pathToWritables.put(filePath, stmts);
}
private void getPartitionMeta(String dbName, OlapTable olapTable, String partitionName,
Map<String, List<? extends Writable>> pathToWritables) {
AlterTableStmt stmt = olapTable.toAddPartitionStmt(dbName, partitionName);
String filePath = pathBuilder.addPartitionStmt(dbName, olapTable.getName(), partitionName);
List<AlterTableStmt> stmts = Lists.newArrayList(stmt);
Preconditions.checkState(!pathToWritables.containsKey(filePath));
pathToWritables.put(filePath, stmts);
}
private void writeMeta(Map<String, List<? extends Writable>> pathToWritables) throws IOException {
// 1. write meta
for (Map.Entry<String, List<? extends Writable>> entry : pathToWritables.entrySet()) {
String filePath = entry.getKey();
List<? extends Writable> writables = entry.getValue();
ObjectWriter.write(filePath, writables);
}
}
private void snapshot(Database db) throws DdlException {
AgentBatchTask batchTask = new AgentBatchTask();
long dbId = db.getId();
db.readLock();
try {
for (Map.Entry<Long, Set<Long>> entry : tableIdToPartitionIds.entrySet()) {
long tableId = entry.getKey();
Set<Long> partitionIds = entry.getValue();
Table table = db.getTable(tableId);
if (table == null) {
throw new DdlException("table[" + tableId + "] does not exist");
}
if (table.getType() != TableType.OLAP) {
continue;
}
OlapTable olapTable = (OlapTable) table;
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
throw new DdlException("partition[" + partitionId + "] does not exist");
}
Pair<Long, Long> versionInfo = partitionIdToVersionInfo.get(partitionId);
for (Long indexId : tableIdToIndexIds.get(tableId)) {
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
MaterializedIndex index = partition.getIndex(indexId);
if (index == null) {
throw new DdlException("index[" + indexId + "] does not exist");
}
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
List<Long> backendIds = Lists.newArrayList();
for (Replica replica : tablet.getReplicas()) {
if (replica.checkVersionCatchUp(versionInfo.first, versionInfo.second)) {
backendIds.add(replica.getBackendId());
}
}
if (backendIds.isEmpty()) {
String msg = "tablet[" + tabletId + "] does not check up with version: "
+ versionInfo.first + "-" + versionInfo.second;
// this should not happen
LOG.error(msg);
throw new DdlException(msg);
}
long chosenBackendId = -1;
SnapshotTask task = new SnapshotTask(null, chosenBackendId, tabletId, jobId, dbId, tableId,
partitionId, indexId, tabletId,
versionInfo.first, versionInfo.second,
schemaHash, -1L, false);
LOG.debug("choose backend[{}] to make snapshot for tablet[{}]", chosenBackendId, tabletId);
batchTask.addTask(task);
unfinishedTabletIds.put(tabletId, chosenBackendId);
} // end for tablet
} // end for indices
} // end for partitions
} // end for tables
} finally {
db.readUnlock();
}
phasedTimeoutMs = unfinishedTabletIds.size() * SNAPSHOT_TIMEOUT_MS;
LOG.debug("estimate snapshot timeout: {}, tablet size: {}", phasedTimeoutMs, unfinishedTabletIds.size());
// send task
for (AgentTask task : batchTask.getAllTasks()) {
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
state = BackupJobState.SNAPSHOT;
LOG.info("finish send snapshot task. job: {}", jobId);
}
private synchronized void waitSnapshot() throws DdlException {
if (unfinishedTabletIds.isEmpty()) {
snapshotFinishedTime = System.currentTimeMillis();
state = BackupJobState.UPLOAD;
Catalog.getInstance().getEditLog().logBackupFinishSnapshot(this);
LOG.info("backup job[{}] is finished making snapshot", jobId);
return;
} else if (System.currentTimeMillis() - metaSavedTime > phasedTimeoutMs) {
// remove task in AgentTaskQueue
for (Map.Entry<Long, Long> entry : unfinishedTabletIds.entries()) {
AgentTaskQueue.removeTask(entry.getValue(), TTaskType.MAKE_SNAPSHOT, entry.getKey());
}
// check timeout
String msg = "snapshot timeout. " + phasedTimeoutMs + "s.";
LOG.warn("{}. job[{}]", msg, jobId);
throw new DdlException(msg);
} else {
LOG.debug("waiting {} tablets to make snapshot", unfinishedTabletIds.size());
}
}
private void upload() throws IOException, DdlException, InterruptedException, ExecutionException {
LOG.debug("start upload. job[{}]", jobId);
if (commandBuilder == null) {
String remotePropFilePath = pathBuilder.remoteProperties();
commandBuilder = CommandBuilder.create(remotePropFilePath, remoteProperties);
}
Preconditions.checkNotNull(commandBuilder);
// 1. send meta to remote source
if (!uploadMetaObjs()) {
return;
}
// 2. send upload task to be
sendUploadTasks();
}
private boolean uploadMetaObjs() throws IOException, InterruptedException, ExecutionException {
if (future == null) {
LOG.info("begin to submit upload meta objs. job: {}", jobId);
String dest = PathBuilder.createPath(remotePath, getLabel());
String uploadCmd = commandBuilder.uploadCmd(getLabel(), pathBuilder.getRoot().getFullPath(), dest);
MetaUploadTask uploadTask = new MetaUploadTask(uploadCmd);
// future = Catalog.getInstance().getBackupHandler().getAsynchronousCmdExecutor().submit(uploadTask);
return false;
} else {
return checkFuture("upload meta objs");
}
}
private synchronized void sendUploadTasks() throws DdlException {
Preconditions.checkState(unfinishedTabletIds.isEmpty());
AgentBatchTask batchTask = new AgentBatchTask();
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
throw new DdlException("database[" + getDbName() + "] does not exist");
}
db.readLock();
try {
String dbName = db.getFullName();
for (Map.Entry<Long, Set<Long>> entry : tableIdToPartitionIds.entrySet()) {
long tableId = entry.getKey();
Set<Long> partitionIds = entry.getValue();
Table table = db.getTable(tableId);
if (table == null) {
throw new DdlException("table[" + tableId + "] does not exist");
}
if (table.getType() != TableType.OLAP) {
continue;
}
OlapTable olapTable = (OlapTable) table;
String tableName = olapTable.getName();
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
throw new DdlException("partition[" + partitionId + "] does not exist");
}
String partitionName = partition.getName();
for (Long indexId : tableIdToIndexIds.get(tableId)) {
MaterializedIndex index = partition.getIndex(indexId);
if (index == null) {
throw new DdlException("index[" + index + "] does not exist");
}
String indexName = olapTable.getIndexNameById(indexId);
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
if (!tabletIdToSnapshotPath.containsKey(tabletId)) {
// this should not happend
String msg = "tablet[" + tabletId + "]'s snapshot is missing";
LOG.error(msg);
throw new DdlException(msg);
}
Pair<Long, String> snapshotInfo = tabletIdToSnapshotPath.get(tabletId);
String dest = pathBuilder.tabletRemotePath(dbName, tableName, partitionName,
indexName, tabletId, remotePath, getLabel());
unfinishedTabletIds.put(tabletId, snapshotInfo.first);
} // end for tablet
} // end for indices
} // end for partitions
} // end for tables
} finally {
db.readUnlock();
}
// send task
for (AgentTask task : batchTask.getAllTasks()) {
AgentTaskQueue.addTask(task);
}
AgentTaskExecutor.submit(batchTask);
state = BackupJobState.UPLOADING;
LOG.info("finish send upload task. job: {}", jobId);
}
private synchronized void waitUpload() throws DdlException {
if (unfinishedTabletIds.isEmpty()) {
LOG.info("backup job[{}] is finished upload snapshot", jobId);
uploadFinishedTime = System.currentTimeMillis();
state = BackupJobState.FINISHING;
return;
} else {
LOG.debug("waiting {} tablets to upload snapshot", unfinishedTabletIds.size());
}
}
private void finishing() throws DdlException, InterruptedException, ExecutionException, IOException {
// save manifest and upload
// manifest contain all file under {label}/
if (future == null) {
LOG.info("begin to submit save and upload manifest. job: {}", jobId);
String deleteInfo = lastestDeleteInfo == null ? "" : lastestDeleteInfo.toString();
SaveManifestTask task = new SaveManifestTask(jobId, getLabel(), remotePath, getLocalDirName(),
lastestLoadLabel, deleteInfo, pathBuilder, commandBuilder);
// future = Catalog.getInstance().getBackupHandler().getAsynchronousCmdExecutor().submit(task);
} else {
boolean finished = checkFuture("save and upload manifest");
if (finished) {
// reset future
readableManifestPath =
PathBuilder.createPath(remotePath, getLabel(), PathBuilder.READABLE_MANIFEST_NAME);
future = null;
state = BackupJobState.FINISHED;
}
}
}
public void restoreTableState(Catalog catalog) {
Database db = catalog.getDb(dbId);
if (db != null) {
db.writeLock();
try {
for (long tableId : tableIdToPartitionIds.keySet()) {
Table table = db.getTable(tableId);
if (table != null && table.getType() == TableType.OLAP) {
if (((OlapTable) table).getState() == OlapTableState.BACKUP) {
((OlapTable) table).setState(OlapTableState.NORMAL);
LOG.debug("set table[{}] state to NORMAL", table.getName());
}
}
}
} finally {
db.writeUnlock();
}
}
}
private void removeLeftTasks() {
for (Map.Entry<Long, Long> entry : unfinishedTabletIds.entries()) {
AgentTaskQueue.removeTask(entry.getValue(), TTaskType.MAKE_SNAPSHOT, entry.getKey());
AgentTaskQueue.removeTask(entry.getValue(), TTaskType.UPLOAD, entry.getKey());
}
}
@Override
public void end(Catalog catalog, boolean isReplay) {
// 1. set table state
restoreTableState(catalog);
if (!isReplay) {
// 2. remove agent tasks if left
removeLeftTasks();
if (pathBuilder == null) {
finishedTime = System.currentTimeMillis();
Catalog.getInstance().getEditLog().logBackupFinish(this);
LOG.info("finished end job[{}]. state: {}", jobId, state.name());
return;
}
// 3. remove local file
String labelDir = pathBuilder.getRoot().getFullPath();
Util.deleteDirectory(new File(labelDir));
LOG.debug("delete local dir: {}", labelDir);
// 4. release snapshot
synchronized (tabletIdToSnapshotPath) {
AgentBatchTask batchTask = new AgentBatchTask();
for (Long tabletId : tabletIdToSnapshotPath.keySet()) {
long backendId = tabletIdToSnapshotPath.get(tabletId).first;
String snapshotPath = tabletIdToSnapshotPath.get(tabletId).second;
ReleaseSnapshotTask task = new ReleaseSnapshotTask(null, backendId, dbId, tabletId, snapshotPath);
batchTask.addTask(task);
}
// no need to add to AgentTaskQueue
AgentTaskExecutor.submit(batchTask);
}
finishedTime = System.currentTimeMillis();
Catalog.getInstance().getEditLog().logBackupFinish(this);
}
clearJob();
LOG.info("finished end job[{}]. state: {}, replay: {}", jobId, state.name(), isReplay);
}
@Override
protected void clearJob() {
tableIdToPartitionIds = null;
tableIdToIndexIds = null;
partitionIdToVersionInfo = null;
tabletIdToSnapshotPath = null;
unfinishedTabletIds = null;
remoteProperties = null;
pathBuilder = null;
commandBuilder = null;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, state.name());
Text.writeString(out, lastestLoadLabel);
if (lastestDeleteInfo == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
lastestDeleteInfo.write(out);
}
if (tableIdToPartitionIds == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = tableIdToPartitionIds.size();
out.writeInt(size);
for (Map.Entry<Long, Set<Long>> entry : tableIdToPartitionIds.entrySet()) {
out.writeLong(entry.getKey());
size = entry.getValue().size();
out.writeInt(size);
for (Long partitionId : entry.getValue()) {
out.writeLong(partitionId);
}
}
}
if (tableIdToIndexIds == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Collection<Map.Entry<Long, Long>> entries = tableIdToIndexIds.entries();
int size = entries.size();
out.writeInt(size);
for (Map.Entry<Long, Long> entry : entries) {
out.writeLong(entry.getKey());
out.writeLong(entry.getValue());
}
}
if (partitionIdToVersionInfo == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = partitionIdToVersionInfo.size();
out.writeInt(size);
for (Map.Entry<Long, Pair<Long, Long>> entry : partitionIdToVersionInfo.entrySet()) {
out.writeLong(entry.getKey());
Pair<Long, Long> pair = entry.getValue();
out.writeLong(pair.first);
out.writeLong(pair.second);
}
}
if (tabletIdToSnapshotPath == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = tabletIdToSnapshotPath.size();
out.writeInt(size);
for (Map.Entry<Long, Pair<Long, String>> entry : tabletIdToSnapshotPath.entrySet()) {
out.writeLong(entry.getKey());
Pair<Long, String> pair = entry.getValue();
out.writeLong(pair.first);
Text.writeString(out, pair.second);
}
}
out.writeLong(metaSavedTime);
out.writeLong(snapshotFinishedTime);
out.writeLong(uploadFinishedTime);
out.writeLong(phasedTimeoutMs);
Text.writeString(out, readableManifestPath);
if (pathBuilder == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
pathBuilder.write(out);
}
if (commandBuilder == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
commandBuilder.write(out);
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
state = BackupJobState.valueOf(Text.readString(in));
lastestLoadLabel = Text.readString(in);
if (in.readBoolean()) {
lastestDeleteInfo = new DeleteInfo();
lastestDeleteInfo.readFields(in);
}
if (in.readBoolean()) {
int size = in.readInt();
for (int i = 0; i < size; i++) {
long tableId = in.readLong();
Set<Long> partitionIds = Sets.newHashSet();
tableIdToPartitionIds.put(tableId, partitionIds);
int count = in.readInt();
for (int j = 0; j < count; j++) {
long partitionId = in.readLong();
partitionIds.add(partitionId);
}
}
}
if (in.readBoolean()) {
int size = in.readInt();
for (int i = 0; i < size; i++) {
long tableId = in.readLong();
long indexId = in.readLong();
tableIdToIndexIds.put(tableId, indexId);
}
}
if (in.readBoolean()) {
int size = in.readInt();
for (int i = 0; i < size; i++) {
long partitionId = in.readLong();
long version = in.readLong();
long versionHash = in.readLong();
partitionIdToVersionInfo.put(partitionId, new Pair<Long, Long>(version, versionHash));
}
}
if (in.readBoolean()) {
int size = in.readInt();
for (int i = 0; i < size; i++) {
long tabletId = in.readLong();
long backendId = in.readLong();
String path = Text.readString(in);
tabletIdToSnapshotPath.put(tabletId, new Pair<Long, String>(backendId, path));
}
}
metaSavedTime = in.readLong();
snapshotFinishedTime = in.readLong();
uploadFinishedTime = in.readLong();
phasedTimeoutMs = in.readLong();
readableManifestPath = Text.readString(in);
if (in.readBoolean()) {
pathBuilder = new PathBuilder();
pathBuilder.readFields(in);
}
if (in.readBoolean()) {
commandBuilder = new CommandBuilder();
commandBuilder.readFields(in);
}
}
}

View File

@ -1,887 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
import org.apache.doris.alter.RollupHandler;
import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.AddRollupClause;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@Deprecated
public class RestoreJob_D extends AbstractBackupJob_D {
private static final Logger LOG = LogManager.getLogger(RestoreJob_D.class);
public enum RestoreJobState {
PENDING,
RESTORE_META,
DOWNLOAD,
DOWNLOADING,
FINISHED,
CANCELLED
}
private RestoreJobState state;
private Map<String, Set<String>> tableToPartitionNames;
private Map<String, String> tableRenameMap;
private Map<String, CreateTableStmt> tableToCreateTableStmt;
private Map<String, AlterTableStmt> tableToRollupStmt;
private com.google.common.collect.Table<String, String, AlterTableStmt> tableToPartitionStmts;
private Map<String, Boolean> tableToReplace;
private Map<String, Table> restoredTables;
// tableid - partition name - partition
private com.google.common.collect.Table<Long, String, Partition> restoredPartitions;
private long metaRestoredTime;
private long downloadFinishedTime;
public RestoreJob_D() {
super();
}
public RestoreJob_D(long jobId, long dbId, LabelName labelName, String restorePath,
Map<String, String> remoteProperties, Map<String, Set<String>> tableToPartitionNames,
Map<String, String> tableRenameMap) {
super(jobId, dbId, labelName, restorePath, remoteProperties);
state = RestoreJobState.PENDING;
this.tableToPartitionNames = tableToPartitionNames;
this.tableRenameMap = tableRenameMap;
this.tableToCreateTableStmt = Maps.newHashMap();
this.tableToRollupStmt = Maps.newHashMap();
this.tableToPartitionStmts = HashBasedTable.create();
this.tableToReplace = Maps.newHashMap();
this.restoredTables = Maps.newHashMap();
this.restoredPartitions = HashBasedTable.create();
this.metaRestoredTime = -1L;
this.downloadFinishedTime = -1L;
}
public void setState(RestoreJobState state) {
this.state = state;
}
public RestoreJobState getState() {
return state;
}
public long getMetaRestoredTime() {
return metaRestoredTime;
}
public long getDownloadFinishedTime() {
return downloadFinishedTime;
}
public Map<String, Set<String>> getTableToPartitionNames() {
return tableToPartitionNames;
}
@Override
public List<Comparable> getJobInfo() {
List<Comparable> jobInfo = Lists.newArrayList();
jobInfo.add(jobId);
jobInfo.add(getLabel());
jobInfo.add(state.name());
jobInfo.add(TimeUtils.longToTimeString(createTime));
jobInfo.add(TimeUtils.longToTimeString(metaRestoredTime));
jobInfo.add(TimeUtils.longToTimeString(downloadFinishedTime));
jobInfo.add(TimeUtils.longToTimeString(finishedTime));
jobInfo.add(errMsg);
jobInfo.add(remotePath);
jobInfo.add(getLeftTasksNum());
return jobInfo;
}
@Override
public void runOnce() {
LOG.debug("begin to run restore job: {}, state: {}", jobId, state.name());
try {
switch (state) {
case PENDING:
downloadBackupMeta();
break;
case RESTORE_META:
restoreMeta();
break;
case DOWNLOAD:
download();
break;
case DOWNLOADING:
waitDownload();
break;
default:
break;
}
} catch (Exception e) {
errMsg = Strings.nullToEmpty(e.getMessage());
LOG.warn("failed to restore: [" + errMsg + "], job[" + jobId + "]", e);
state = RestoreJobState.CANCELLED;
}
if (state == RestoreJobState.FINISHED || state == RestoreJobState.CANCELLED) {
end(Catalog.getInstance(), false);
}
}
private void downloadBackupMeta() throws DdlException, IOException, AnalysisException, InterruptedException,
ExecutionException {
Catalog catalog = Catalog.getInstance();
Database db = catalog.getDb(dbId);
if (db == null) {
throw new DdlException("Database[" + getDbName() + "] does not exist");
}
if (pathBuilder == null) {
pathBuilder = PathBuilder.createPathBuilder(getLocalDirName());
}
if (commandBuilder == null) {
String remotePropFilePath = pathBuilder.remoteProperties();
commandBuilder = CommandBuilder.create(remotePropFilePath, remoteProperties);
}
if (future == null) {
// 1. download manifest
LOG.info("begin to submit download backup meta. job: {}", jobId);
MetaDownloadTask task = new MetaDownloadTask(jobId, getDbName(), getLabel(), getLocalDirName(), remotePath,
pathBuilder, commandBuilder,
tableToPartitionNames, tableToCreateTableStmt,
tableToRollupStmt, tableToPartitionStmts, tableToReplace,
tableRenameMap);
// future = Catalog.getInstance().getBackupHandler().getAsynchronousCmdExecutor().submit(task);
} else {
boolean finished = checkFuture("download backup meta");
if (!finished) {
return;
}
future = null;
state = RestoreJobState.RESTORE_META;
}
}
private void restoreMeta() throws DdlException {
Catalog catalog = Catalog.getInstance();
Database db = catalog.getDb(dbId);
if (db == null) {
throw new DdlException("Database[" + getDbName() + "] does not exist");
}
for (Map.Entry<String, CreateTableStmt> entry : tableToCreateTableStmt.entrySet()) {
String newTableName = entry.getKey();
CreateTableStmt createTableStmt = entry.getValue();
Boolean replace = tableToReplace.get(newTableName);
if (replace) {
// 1. create table
Table restoredTable = catalog.createTable(createTableStmt, true);
restoredTables.put(newTableName, restoredTable);
if (restoredTable.getType() != TableType.OLAP) {
continue;
}
OlapTable restoredOlapTable = (OlapTable) restoredTable;
// 2. create rollup
RollupHandler rollupHandler = catalog.getRollupHandler();
AlterTableStmt rollupStmt = tableToRollupStmt.get(newTableName);
if (rollupStmt != null) {
// check if new table name conflicts with rollup index name
for (AlterClause clause : rollupStmt.getOps()) {
Preconditions.checkState(clause instanceof AddRollupClause);
String rollupName = ((AddRollupClause) clause).getRollupName();
if (rollupName.equals(newTableName)) {
throw new DdlException("New table name[" + newTableName
+ "] conflicts with rollup index name");
}
}
rollupHandler.process(rollupStmt.getOps(), db, restoredOlapTable, true);
}
// 3. create partition
Map<String, AlterTableStmt> partitionStmts = tableToPartitionStmts.row(newTableName);
if (partitionStmts.isEmpty()) {
continue;
}
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) restoredOlapTable.getPartitionInfo();
for (Map.Entry<String, AlterTableStmt> entry2 : partitionStmts.entrySet()) {
AlterTableStmt stmt = entry2.getValue();
AddPartitionClause clause = (AddPartitionClause) stmt.getOps().get(0);
Pair<Long, Partition> res = catalog.addPartition(db, newTableName, restoredOlapTable, clause, true);
Partition partition = res.second;
rangePartitionInfo.handleNewSinglePartitionDesc(clause.getSingeRangePartitionDesc(),
partition.getId());
restoredOlapTable.addPartition(partition);
}
} else {
Map<String, AlterTableStmt> partitionStmts = tableToPartitionStmts.row(newTableName);
for (Map.Entry<String, AlterTableStmt> entry2 : partitionStmts.entrySet()) {
AlterTableStmt stmt = entry2.getValue();
Pair<Long, Partition> res = catalog.addPartition(db, newTableName, null,
(AddPartitionClause) stmt.getOps().get(0), true);
long tableId = res.first;
Partition partition = res.second;
restoredPartitions.put(tableId, partition.getName(), partition);
}
}
}
metaRestoredTime = System.currentTimeMillis();
state = RestoreJobState.DOWNLOAD;
LOG.info("finished restore tables. job[{}]", jobId);
}
private void download() {
for (Map.Entry<String, Table> entry : restoredTables.entrySet()) {
String newTableName = entry.getKey();
String tableName = tableRenameMap.get(newTableName);
Table table = entry.getValue();
if (table.getType() != TableType.OLAP) {
continue;
}
AgentBatchTask batchTask = new AgentBatchTask();
OlapTable olapTable = (OlapTable) table;
long tableId = olapTable.getId();
for (Partition partition : olapTable.getPartitions()) {
String partitionName = partition.getName();
if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) {
// single partition table
partitionName = tableName;
}
long partitionId = partition.getId();
for (MaterializedIndex index : partition.getMaterializedIndices()) {
long indexId = index.getId();
String indexName = olapTable.getIndexNameById(index.getId());
if (indexName.equals(newTableName)) {
// base index
indexName = tableName;
}
List<Long> orderedBackupedTabletIdList = getRestoredTabletInfo(tableName, partitionName, indexName);
int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
List<Tablet> tablets = index.getTablets();
for (int i = 0; i < tablets.size(); i++) {
Tablet tablet = tablets.get(i);
Long backupedTabletId = orderedBackupedTabletIdList.get(i);
String remoteFilePath = PathBuilder.createPath(remotePath, getDbName(), tableName,
partitionName, indexName,
backupedTabletId.toString());
for (Replica replica : tablet.getReplicas()) {
}
} // end for tablets
} // end for indices
} // end for partitions
synchronized (unfinishedTabletIds) {
for (AgentTask task : batchTask.getAllTasks()) {
AgentTaskQueue.addTask(task);
unfinishedTabletIds.put(task.getTabletId(), task.getBackendId());
}
}
AgentTaskExecutor.submit(batchTask);
LOG.info("finished send restore tasks for table: {}, job: {}", tableName, jobId);
} // end for tables
state = RestoreJobState.DOWNLOADING;
LOG.info("finished send all restore tasks. job: {}", jobId);
}
private List<Long> getRestoredTabletInfo(String tableName, String partitionName, String indexName) {
// pathBuilder.getRoot().print("\t");
DirSaver indexDir = (DirSaver) pathBuilder.getRoot().getChild(getDbName()).getChild(tableName)
.getChild(partitionName).getChild(indexName);
Collection<String> tabletNames = indexDir.getChildrenName();
Set<Long> orderedBackupedTabletIds = Sets.newTreeSet();
for (String tabletName : tabletNames) {
orderedBackupedTabletIds.add(Long.valueOf(tabletName));
}
List<Long> orderedBackupedTabletIdList = Lists.newArrayList(orderedBackupedTabletIds);
return orderedBackupedTabletIdList;
}
private void waitDownload() throws DdlException {
synchronized (unfinishedTabletIds) {
if (!unfinishedTabletIds.isEmpty()) {
LOG.debug("waiting for unfinished download task. size: {}", unfinishedTabletIds.size());
return;
}
}
downloadFinishedTime = System.currentTimeMillis();
LOG.info("all tablets restore finished. job: {}", jobId);
finishing(Catalog.getInstance(), false);
state = RestoreJobState.FINISHED;
}
public void finishing(Catalog catalog, boolean isReplay) throws DdlException {
Database db = catalog.getDb(dbId);
if (db == null && !isReplay) {
throw new DdlException("Database[{}] does not exist");
}
db.writeLock();
try {
// check again if table or partition already exist
for (Map.Entry<String, Table> entry : restoredTables.entrySet()) {
String tableName = entry.getKey();
Table currentTable = db.getTable(tableName);
if (currentTable != null) {
throw new DdlException("Table[" + tableName + "]' already exist. "
+ "Drop table first or restore to another table");
}
}
for (long tableId : restoredPartitions.rowKeySet()) {
Table table = db.getTable(tableId);
if (table == null || table.getType() != TableType.OLAP) {
throw new DdlException("Table[" + tableId + "]' does not exist.");
}
Map<String, Partition> partitions = restoredPartitions.row(tableId);
OlapTable olapTable = (OlapTable) table;
for (Map.Entry<String, Partition> entry : partitions.entrySet()) {
String partitionName = entry.getKey();
Partition currentPartition = olapTable.getPartition(partitionName);
if (currentPartition != null) {
throw new DdlException("Partition[" + partitionName + "]' already exist in table["
+ tableId + "]. Drop partition first or restore to another table");
}
}
}
// add tables
for (Map.Entry<String, Table> entry : restoredTables.entrySet()) {
String tableName = entry.getKey();
Table restoredTable = entry.getValue();
if (restoredTable.getType() == TableType.OLAP) {
OlapTable olapTable = (OlapTable) restoredTable;
olapTable.setState(OlapTableState.NORMAL);
if (isReplay) {
// add inverted index
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
long tableId = olapTable.getId();
for (Partition partition : olapTable.getPartitions()) {
long partitionId = partition.getId();
for (MaterializedIndex index : partition.getMaterializedIndices()) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId,
schemaHash, TStorageMedium.HDD);
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
}
}
}
}
}
db.createTable(restoredTable);
LOG.info("finished add table: {}, job: {}, replay: {}", tableName, jobId, isReplay);
}
// add partitions
for (long tableId : restoredPartitions.rowKeySet()) {
Table table = db.getTable(tableId);
String tableName = table.getName();
Preconditions.checkState(table != null, tableName);
Preconditions.checkState(table.getType() == TableType.OLAP, tableName);
OlapTable olapTable = (OlapTable) table;
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Preconditions.checkState(partitionInfo.getType() == PartitionType.RANGE);
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
Map<String, Partition> partitions = restoredPartitions.row(tableId);
for (Map.Entry<String, Partition> entry : partitions.entrySet()) {
String partitionName = entry.getKey();
Partition partition = entry.getValue();
long partitionId = partition.getId();
// add restored partition
AlterTableStmt stmt = tableToPartitionStmts.get(tableName, partitionName);
AddPartitionClause clause = (AddPartitionClause) stmt.getOps().get(0);
rangePartitionInfo.handleNewSinglePartitionDesc(clause.getSingeRangePartitionDesc(), partitionId);
olapTable.addPartition(partition);
// add inverted index
if (isReplay) {
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
for (MaterializedIndex index : partition.getMaterializedIndices()) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
long tabletId = tablet.getId();
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId,
schemaHash, TStorageMedium.HDD);
invertedIndex.addTablet(tabletId, tabletMeta);
}
}
}
LOG.info("finished add partition: {}, table: {}, job: {}, replay: {}",
partitionName, tableName, jobId, isReplay);
} // end for partitions
olapTable.setState(OlapTableState.NORMAL);
} // end for tables
} finally {
db.writeUnlock();
}
}
public void handleFinishedRestore(long tabletId, long backendId) {
synchronized (unfinishedTabletIds) {
if (unfinishedTabletIds.remove(tabletId, backendId)) {
LOG.debug("finished restore tablet[{}], backend[{}]", tabletId, backendId);
}
}
}
@Override
public void end(Catalog catalog, boolean isReplay) {
if (state == RestoreJobState.CANCELLED) {
rollback(catalog);
}
// 2. set table state
// restoreTableState(catalog);
if (!isReplay) {
// 3. remove agent tasks if left
removeLeftTasks();
// 4. remove local file
String labelDir = pathBuilder.getRoot().getFullPath();
Util.deleteDirectory(new File(labelDir));
LOG.debug("delete local dir: {}", labelDir);
// 5. remove unused tablet in tablet inverted index
clearInvertedIndex();
finishedTime = System.currentTimeMillis();
// log
Catalog.getInstance().getEditLog().logRestoreFinish(this);
}
// clear for saving memory
clearJob();
LOG.info("finished end job[{}]. state: {}, replay: {}", jobId, state.name(), isReplay);
}
private void clearInvertedIndex() {
TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex();
if (state == RestoreJobState.CANCELLED) {
// clear restored table tablets
for (Table restoredTable : restoredTables.values()) {
if (restoredTable.getType() != TableType.OLAP) {
continue;
}
OlapTable olapTable = (OlapTable) restoredTable;
for (Partition partition : olapTable.getPartitions()) {
for (MaterializedIndex index : partition.getMaterializedIndices()) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
}
}
// partition
for (Partition partition : restoredPartitions.values()) {
for (MaterializedIndex index : partition.getMaterializedIndices()) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
}
}
}
@Override
protected void clearJob() {
tableRenameMap = null;
tableToCreateTableStmt = null;
tableToRollupStmt = null;
tableToPartitionStmts = null;
tableToReplace = null;
restoredTables = null;
restoredPartitions = null;
unfinishedTabletIds = null;
remoteProperties = null;
pathBuilder = null;
commandBuilder = null;
LOG.info("job[{}] cleared for saving memory", jobId);
}
private void rollback(Catalog catalog) {
Database db = catalog.getDb(dbId);
if (db == null) {
errMsg = "Database does not exist[" + getDbName() + "]";
LOG.info("{}. finished restore old meta. job: {}", errMsg, jobId);
return;
}
db.writeLock();
try {
// tables
for (Table restoredTable : restoredTables.values()) {
String tableName = restoredTable.getName();
// use table id rather than table name.
// because table with same name may be created when doing restore.
// find table by name may get unexpected one.
Table currentTable = db.getTable(restoredTable.getId());
// drop restored table
if (currentTable != null) {
db.dropTable(tableName);
LOG.info("drop restored table[{}] in db[{}]", tableName, dbId);
}
}
// partitions
for (long tableId : restoredPartitions.rowKeySet()) {
OlapTable currentTable = (OlapTable) db.getTable(tableId);
if (currentTable == null) {
// table may be dropped during FINISHING phase
continue;
}
// drop restored partitions
for (String partitionName : restoredPartitions.row(tableId).keySet()) {
Partition currentPartition = currentTable.getPartition(partitionName);
if (currentPartition != null) {
currentTable.dropPartition(dbId, partitionName, true);
LOG.info("drop restored partition[{}] in table[{}] in db[{}]",
partitionName, tableId, dbId);
}
currentTable.setState(OlapTableState.NORMAL);
}
}
} finally {
db.writeUnlock();
}
}
private void removeLeftTasks() {
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, state.name());
if (tableToPartitionNames == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = tableToPartitionNames.size();
out.writeInt(size);
for (Map.Entry<String, Set<String>> entry : tableToPartitionNames.entrySet()) {
Text.writeString(out, entry.getKey());
Set<String> partitionNames = entry.getValue();
size = partitionNames.size();
out.writeInt(size);
for (String partitionName : partitionNames) {
Text.writeString(out, partitionName);
}
}
}
if (tableRenameMap == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = tableRenameMap.size();
out.writeInt(size);
for (Map.Entry<String, String> entry : tableRenameMap.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
if (tableToCreateTableStmt == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = tableToCreateTableStmt.size();
out.writeInt(size);
for (Map.Entry<String, CreateTableStmt> entry : tableToCreateTableStmt.entrySet()) {
throw new RuntimeException("Don't support CreateTableStmt serialization anymore");
}
}
if (tableToRollupStmt == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = tableToRollupStmt.size();
out.writeInt(size);
for (Map.Entry<String, AlterTableStmt> entry : tableToRollupStmt.entrySet()) {
Text.writeString(out, entry.getKey());
entry.getValue().write(out);
}
}
if (tableToPartitionStmts == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = tableToPartitionStmts.rowKeySet().size();
out.writeInt(size);
for (String tableName : tableToPartitionStmts.rowKeySet()) {
Text.writeString(out, tableName);
Map<String, AlterTableStmt> row = tableToPartitionStmts.row(tableName);
size = row.size();
out.writeInt(size);
for (Map.Entry<String, AlterTableStmt> entry : row.entrySet()) {
Text.writeString(out, entry.getKey());
entry.getValue().write(out);
}
}
}
if (tableToReplace == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = tableToReplace.size();
out.writeInt(size);
for (Map.Entry<String, Boolean> entry : tableToReplace.entrySet()) {
Text.writeString(out, entry.getKey());
out.writeBoolean(entry.getValue());
}
}
if (restoredTables == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = restoredTables.size();
out.writeInt(size);
for (Map.Entry<String, Table> entry : restoredTables.entrySet()) {
Text.writeString(out, entry.getKey());
entry.getValue().write(out);
}
}
if (restoredPartitions == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
int size = restoredPartitions.size();
out.writeInt(size);
for (long tableId : restoredPartitions.rowKeySet()) {
out.writeLong(tableId);
Map<String, Partition> row = restoredPartitions.row(tableId);
size = row.size();
out.writeInt(size);
for (Map.Entry<String, Partition> entry : row.entrySet()) {
Text.writeString(out, entry.getKey());
entry.getValue().write(out);
}
}
}
out.writeLong(metaRestoredTime);
out.writeLong(downloadFinishedTime);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
state = RestoreJobState.valueOf(Text.readString(in));
if (in.readBoolean()) {
tableToPartitionNames = Maps.newHashMap();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String tableName = Text.readString(in);
int count = in.readInt();
Set<String> partitionNames = Sets.newHashSet();
for (int j = 0; j < count; j++) {
String partitionName = Text.readString(in);
partitionNames.add(partitionName);
}
tableToPartitionNames.put(tableName, partitionNames);
}
}
if (in.readBoolean()) {
tableRenameMap = Maps.newHashMap();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String newTableName = Text.readString(in);
String tableName = Text.readString(in);
tableRenameMap.put(newTableName, tableName);
}
}
if (in.readBoolean()) {
tableToCreateTableStmt = Maps.newHashMap();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String tableName = Text.readString(in);
CreateTableStmt stmt = CreateTableStmt.read(in);
tableToCreateTableStmt.put(tableName, stmt);
}
}
if (in.readBoolean()) {
tableToRollupStmt = Maps.newHashMap();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String tableName = Text.readString(in);
AlterTableStmt stmt = new AlterTableStmt();
stmt.readFields(in);
tableToRollupStmt.put(tableName, stmt);
}
}
if (in.readBoolean()) {
tableToPartitionStmts = HashBasedTable.create();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String tableName = Text.readString(in);
int count = in.readInt();
for (int j = 0; j < count; j++) {
String partitionName = Text.readString(in);
AlterTableStmt stmt = new AlterTableStmt();
stmt.readFields(in);
tableToPartitionStmts.put(tableName, partitionName, stmt);
}
}
}
if (in.readBoolean()) {
tableToReplace = Maps.newHashMap();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String tableName = Text.readString(in);
Boolean replace = in.readBoolean();
tableToReplace.put(tableName, replace);
}
}
if (in.readBoolean()) {
restoredTables = Maps.newHashMap();
int size = in.readInt();
for (int i = 0; i < size; i++) {
String tableName = Text.readString(in);
Table table = Table.read(in);
restoredTables.put(tableName, table);
}
}
if (in.readBoolean()) {
restoredPartitions = HashBasedTable.create();
int size = in.readInt();
for (int i = 0; i < size; i++) {
long tableId = in.readLong();
int count = in.readInt();
for (int j = 0; j < count; j++) {
String partitionName = Text.readString(in);
Partition partition = Partition.read(in);
restoredPartitions.put(tableId, partitionName, partition);
}
}
}
metaRestoredTime = in.readLong();
downloadFinishedTime = in.readLong();
}
}

View File

@ -71,10 +71,7 @@ import org.apache.doris.analysis.TableRenameClause;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.UserDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.AbstractBackupJob_D;
import org.apache.doris.backup.BackupHandler;
import org.apache.doris.backup.BackupJob_D;
import org.apache.doris.backup.RestoreJob_D;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Database.DbState;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
@ -83,8 +80,6 @@ import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Table.TableType;
import org.apache.doris.clone.Clone;
import org.apache.doris.clone.CloneChecker;
import org.apache.doris.clone.ColocateTableBalancer;
import org.apache.doris.clone.TabletChecker;
import org.apache.doris.clone.TabletScheduler;
@ -269,7 +264,6 @@ public class Catalog {
private LoadManager loadManager;
private RoutineLoadManager routineLoadManager;
private ExportMgr exportMgr;
private Clone clone;
private Alter alter;
private ConsistencyChecker consistencyChecker;
private BackupHandler backupHandler;
@ -428,7 +422,6 @@ public class Catalog {
this.load = new Load();
this.routineLoadManager = new RoutineLoadManager();
this.exportMgr = new ExportMgr();
this.clone = new Clone();
this.alter = new Alter();
this.consistencyChecker = new ConsistencyChecker();
this.lock = new QueryableReentrantLock(true);
@ -1098,14 +1091,9 @@ public class Catalog {
ExportChecker.init(Config.export_checker_interval_second * 1000L);
ExportChecker.startAll();
// Clone checker
if (!Config.use_new_tablet_scheduler) {
CloneChecker.getInstance().setInterval(Config.clone_checker_interval_second * 1000L);
CloneChecker.getInstance().start();
} else {
tabletChecker.start();
tabletScheduler.start();
}
// Tablet checker and scheduler
tabletChecker.start();
tabletScheduler.start();
// Colocate tables balancer
if (!Config.disable_colocate_join) {
@ -1324,7 +1312,6 @@ public class Catalog {
recreateTabletInvertIndex();
checksum = loadLoadJob(dis, checksum);
checksum = loadAlterJob(dis, checksum);
checksum = loadBackupAndRestoreJob_D(dis, checksum);
checksum = loadAccessService(dis, checksum);
checksum = loadRecycleBin(dis, checksum);
checksum = loadGlobalVariable(dis, checksum);
@ -1641,67 +1628,6 @@ public class Catalog {
return checksum;
}
// This method is deprecated, we keep it because we need to consume the old image
// which contains old backup and restore jobs
@Deprecated
public long loadBackupAndRestoreJob_D(DataInputStream dis, long checksum) throws IOException {
long newChecksum = checksum;
if (getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_22
&& getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_42) {
newChecksum = loadBackupAndRestoreJob_D(dis, newChecksum, BackupJob_D.class);
newChecksum = loadBackupAndRestoreJob_D(dis, newChecksum, RestoreJob_D.class);
newChecksum = loadBackupAndRestoreLabel_D(dis, newChecksum);
}
return newChecksum;
}
@Deprecated
private long loadBackupAndRestoreJob_D(DataInputStream dis, long checksum,
Class<? extends AbstractBackupJob_D> jobClass) throws IOException {
int size = dis.readInt();
long newChecksum = checksum ^ size;
for (int i = 0; i < size; i++) {
long dbId = dis.readLong();
newChecksum ^= dbId;
if (jobClass == BackupJob_D.class) {
BackupJob_D job = new BackupJob_D();
job.readFields(dis);
} else {
RestoreJob_D job = new RestoreJob_D();
job.readFields(dis);
}
}
// finished or cancelled
size = dis.readInt();
newChecksum ^= size;
for (int i = 0; i < size; i++) {
long dbId = dis.readLong();
newChecksum ^= dbId;
if (jobClass == BackupJob_D.class) {
BackupJob_D job = new BackupJob_D();
job.readFields(dis);
} else {
RestoreJob_D job = new RestoreJob_D();
job.readFields(dis);
}
}
return newChecksum;
}
@Deprecated
private long loadBackupAndRestoreLabel_D(DataInputStream dis, long checksum) throws IOException {
int size = dis.readInt();
long newChecksum = checksum ^ size;
for (int i = 0; i < size; i++) {
long dbId = dis.readLong();
newChecksum ^= dbId;
Text.readString(dis); // label
}
return newChecksum;
}
public long loadPaloAuth(DataInputStream dis, long checksum) throws IOException {
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_43) {
// CAN NOT use PaloAuth.read(), cause this auth instance is already passed to DomainResolver
@ -3128,10 +3054,6 @@ public class Catalog {
// drop
olapTable.dropPartition(db.getId(), partitionName);
// remove clone job
Clone clone = Catalog.getInstance().getCloneInstance();
clone.cancelCloneJob(partition);
// log
DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName);
editLog.logDropPartition(info);
@ -4607,10 +4529,6 @@ public class Catalog {
return this.exportMgr;
}
public Clone getCloneInstance() {
return this.clone;
}
public SmallFileMgr getSmallFileMgr() {
return this.smallFileMgr;
}

View File

@ -1,636 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.clone.CloneJob.JobPriority;
import org.apache.doris.clone.CloneJob.JobState;
import org.apache.doris.clone.CloneJob.JobType;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CloneTask;
import org.apache.doris.thrift.TTabletInfo;
import org.apache.doris.thrift.TTaskType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class Clone {
private static final Logger LOG = LogManager.getLogger(Clone.class);
// priority to Map<tabletId, CloneJob>
private Map<JobPriority, Map<Long, CloneJob>> priorityToCloneJobs;
// job num: pending + running
private int jobNum;
// lock for clone job
// lock is private and must use after db lock
private ReentrantReadWriteLock lock;
public Clone() {
// init clone jobs
priorityToCloneJobs = new HashMap<JobPriority, Map<Long, CloneJob>>();
for (JobPriority priority : JobPriority.values()) {
Map<Long, CloneJob> cloneJobs = new LinkedHashMap<Long, CloneJob>();
priorityToCloneJobs.put(priority, cloneJobs);
}
jobNum = 0;
lock = new ReentrantReadWriteLock(true);
}
private void readLock() {
lock.readLock().lock();
}
private void readUnlock() {
lock.readLock().unlock();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
/**
* add clone job
* @return true if add new job else false
*/
public boolean addCloneJob(long dbId, long tableId, long partitionId, long indexId, long tabletId,
long destBackendId, JobType type, JobPriority priority, long timeoutSecond) {
writeLock();
try {
// check priority map
Map<Long, CloneJob> cloneJobs = priorityToCloneJobs.get(priority);
if (cloneJobs.containsKey(tabletId)) {
return false;
}
// check other priority map
CloneJob job = null;
if (priority == JobPriority.NORMAL) {
job = priorityToCloneJobs.get(JobPriority.LOW).remove(tabletId);
} else if (priority == JobPriority.LOW) {
job = priorityToCloneJobs.get(JobPriority.NORMAL).remove(tabletId);
} else if (priority == JobPriority.HIGH) {
job = priorityToCloneJobs.get(JobPriority.HIGH).remove(tabletId);
}
if (job != null) {
job.setPriority(priority);
cloneJobs.put(tabletId, job);
return false;
}
// check job num
// TODO(cmy): for now we limit clone job num for NORMAL and LOW Priority clone job
if (priority != JobPriority.HIGH && jobNum >= Config.clone_max_job_num) {
LOG.debug("too many clone jobs. job num: {}", jobNum);
return false;
}
// add job
job = new CloneJob(dbId, tableId, partitionId, indexId, tabletId, destBackendId,
type, priority, timeoutSecond);
cloneJobs.put(tabletId, job);
++jobNum;
LOG.info("add clone job. job: {}, job num: {}", job, jobNum);
return true;
} finally {
writeUnlock();
}
}
public int getJobNum() {
readLock();
try {
return jobNum;
} finally {
readUnlock();
}
}
public Set<Long> getCloneTabletIds() {
Set<Long> cloneTabletIds = new HashSet<Long>();
readLock();
try {
for (Map<Long, CloneJob> cloneJobs : priorityToCloneJobs.values()) {
cloneTabletIds.addAll(cloneJobs.keySet());
}
return cloneTabletIds;
} finally {
readUnlock();
}
}
public boolean containsTablet(long tabletId) {
readLock();
try {
for (Map<Long, CloneJob> cloneJobs : priorityToCloneJobs.values()) {
if (cloneJobs.containsKey(tabletId)) {
return true;
}
}
return false;
} finally {
readUnlock();
}
}
/**
* get state clone jobs order by priority
*/
public List<CloneJob> getCloneJobs(JobState state) {
List<CloneJob> cloneJobs = new ArrayList<CloneJob>();
readLock();
try {
for (CloneJob job : priorityToCloneJobs.get(JobPriority.HIGH).values()) {
if (job.getState() == state) {
cloneJobs.add(job);
}
}
for (CloneJob job : priorityToCloneJobs.get(JobPriority.NORMAL).values()) {
if (job.getState() == state) {
cloneJobs.add(job);
}
}
for (CloneJob job : priorityToCloneJobs.get(JobPriority.LOW).values()) {
if (job.getState() == state) {
cloneJobs.add(job);
}
}
return cloneJobs;
} finally {
readUnlock();
}
}
/**
* get state clone jobs'num
*/
public int getCloneJobNum(JobState state, long dbId) {
int jobNum = 0;
readLock();
try {
for (CloneJob job : priorityToCloneJobs.get(JobPriority.HIGH).values()) {
if (job.getState() == state && job.getDbId() == dbId) {
++jobNum;
}
}
for (CloneJob job : priorityToCloneJobs.get(JobPriority.NORMAL).values()) {
if (job.getState() == state && job.getDbId() == dbId) {
++jobNum;
}
}
for (CloneJob job : priorityToCloneJobs.get(JobPriority.LOW).values()) {
if (job.getState() == state && job.getDbId() == dbId) {
++jobNum;
}
}
return jobNum;
} finally {
readUnlock();
}
}
/**
* get clone jobs for proc
*/
public List<List<Comparable>> getCloneJobInfosByDb(Database db) {
List<List<Comparable>> cloneJobInfos = new ArrayList<List<Comparable>>();
long dbId = db.getId();
readLock();
try {
for (Map<Long, CloneJob> cloneJobs : priorityToCloneJobs.values()) {
for (CloneJob job : cloneJobs.values()) {
if (job.getDbId() != dbId) {
continue;
}
List<Comparable> jobInfo = new ArrayList<Comparable>();
jobInfo.add(job.getDbId());
jobInfo.add(job.getTableId());
jobInfo.add(job.getPartitionId());
jobInfo.add(job.getIndexId());
jobInfo.add(job.getTabletId());
jobInfo.add(job.getDestBackendId());
jobInfo.add(job.getState().name());
jobInfo.add(job.getType().name());
jobInfo.add(job.getPriority().name());
CloneTask cloneTask = job.getCloneTask();
if (cloneTask != null) {
jobInfo.add(cloneTask.getVisibleVersion());
jobInfo.add(cloneTask.getVisibleVersionHash());
jobInfo.add(cloneTask.getFailedTimes());
} else {
jobInfo.add(-1L);
jobInfo.add(-1L);
jobInfo.add(0);
}
jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs()));
jobInfo.add(TimeUtils.longToTimeString(job.getCloneStartTimeMs()));
jobInfo.add(TimeUtils.longToTimeString(job.getCloneFinishTimeMs()));
jobInfo.add(job.getTimeoutMs() / 1000);
jobInfo.add(job.getFailMsg());
cloneJobInfos.add(jobInfo);
}
}
} finally {
readUnlock();
}
// sort by create time
ListComparator<List<Comparable>> comparator = new ListComparator<List<Comparable>>(9);
Collections.sort(cloneJobInfos, comparator);
return cloneJobInfos;
}
/**
* add task to task queue and update job running
*/
public boolean runCloneJob(CloneJob job, CloneTask task) {
writeLock();
try {
if (job.getState() != JobState.PENDING) {
LOG.warn("clone job state is not pending. job: {}", job);
return false;
}
if (AgentTaskQueue.addTask(task)) {
job.setState(JobState.RUNNING);
job.setCloneStartTimeMs(System.currentTimeMillis());
job.setCloneTask(task);
return true;
} else {
return false;
}
} finally {
writeUnlock();
}
}
/**
* check job timeout
*/
public void checkTimeout() {
long currentTimeMs = System.currentTimeMillis();
writeLock();
try {
for (Map<Long, CloneJob> cloneJobs : priorityToCloneJobs.values()) {
for (CloneJob job : cloneJobs.values()) {
JobState state = job.getState();
if (state == JobState.PENDING || state == JobState.RUNNING) {
if (currentTimeMs - job.getCreateTimeMs() > job.getTimeoutMs()) {
cancelCloneJob(job, "clone timeout");
LOG.warn("clone timeout. job: {}, src state: {}", job, state.name());
}
}
}
}
} finally {
writeUnlock();
}
}
/**
* cancel clone job by job
*/
public void cancelCloneJob(CloneJob job, String failMsg) {
writeLock();
try {
JobState state = job.getState();
if (state != JobState.PENDING && state != JobState.RUNNING) {
LOG.warn("clone job state is not pending or running. job: {}", job);
return;
}
// remove clone task
AgentTaskQueue.removeTask(job.getDestBackendId(), TTaskType.CLONE, job.getTabletId());
// the cloned replica will be removed from meta when we remove clone job
// update job state
job.setState(JobState.CANCELLED);
job.setFailMsg(failMsg);
} finally {
writeUnlock();
}
LOG.warn("cancel clone job. job: {}", job);
}
/**
* cancel clone job by tabletId
*/
public void cancelCloneJob(long tabletId, String failMsg) {
writeLock();
try {
for (Map<Long, CloneJob> cloneJobs : priorityToCloneJobs.values()) {
if (cloneJobs.containsKey(tabletId)) {
cancelCloneJob(cloneJobs.get(tabletId), failMsg);
return;
}
}
} finally {
writeUnlock();
}
}
/**
* cancel clone jobs in table. must use with db lock outside!!!!!!
*/
public void cancelCloneJob(OlapTable olapTable) {
for (Partition partition : olapTable.getPartitions()) {
cancelCloneJob(partition);
}
}
/**
* cancel clone jobs in partition. must use with db lock outside!!!!!!
*/
public void cancelCloneJob(Partition partition) {
String failMsg = "partition[" + partition.getName() + "] has been dropped";
for (MaterializedIndex materializedIndex : partition.getMaterializedIndices()) {
for (Tablet tablet : materializedIndex.getTablets()) {
cancelCloneJob(tablet.getId(), failMsg);
}
}
}
public void finishCloneJob(CloneTask task, TTabletInfo tabletInfo) {
// get clone job
long tabletId = task.getTabletId();
CloneJob job = null;
readLock();
try {
for (Map<Long, CloneJob> cloneJobs : priorityToCloneJobs.values()) {
if (cloneJobs.containsKey(tabletId)) {
job = cloneJobs.get(tabletId);
break;
}
}
} finally {
readUnlock();
}
if (job == null) {
LOG.warn("clone job does not exist. tablet id: {}", tabletId);
return;
}
// update meta
long dbId = task.getDbId();
long tableId = task.getTableId();
long partitionId = task.getPartitionId();
long indexId = task.getIndexId();
long backendId = task.getBackendId();
int schemaHash = task.getSchemaHash();
long taskVersion = task.getVisibleVersion();
long taskVersionHash = task.getVisibleVersionHash();
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
String failMsg = "db does not exist. id: " + dbId;
LOG.warn(failMsg);
cancelCloneJob(job, failMsg);
return;
}
db.writeLock();
try {
OlapTable olapTable = (OlapTable) db.getTable(tableId);
if (olapTable == null) {
throw new MetaNotFoundException("table does not exist. id: " + tableId);
}
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
throw new MetaNotFoundException("partition does not exist. id: " + partitionId);
}
MaterializedIndex index = partition.getIndex(indexId);
if (index == null) {
throw new MetaNotFoundException("index does not exist. id: " + indexId);
}
if (schemaHash != olapTable.getSchemaHashByIndexId(indexId)) {
throw new MetaNotFoundException("schema hash is not consistent. index's: "
+ olapTable.getSchemaHashByIndexId(indexId)
+ ", task's: " + schemaHash);
}
Tablet tablet = index.getTablet(tabletId);
if (tablet == null) {
throw new MetaNotFoundException("tablet does not exist. id: " + tabletId);
}
Replica replica = tablet.getReplicaByBackendId(backendId);
if (replica == null) {
throw new MetaNotFoundException("replica does not exist. tablet id: " + tabletId
+ ", backend id: " + backendId);
}
// Here we do not check is clone version is equal to the committed version.
// Because in case of high frequency loading, clone version always lags behind the committed version,
// so the clone job will never succeed, which cause accumulation of quorum finished load jobs.
// But we will check if the cloned replica's version is larger than or equal to the task's version.
// We should discard the cloned replica with stale version.
if (tabletInfo.getVersion() < taskVersion
|| (tabletInfo.getVersion() == taskVersion && tabletInfo.getVersion_hash() != taskVersionHash)) {
throw new MetaNotFoundException(String.format("cloned replica's version info is stale. %d-%d,"
+ " expected: %d-%d",
tabletInfo.getVersion(), tabletInfo.getVersion_hash(),
taskVersion, taskVersionHash));
}
long version = tabletInfo.getVersion();
long versionHash = tabletInfo.getVersion_hash();
long rowCount = tabletInfo.getRow_count();
long dataSize = tabletInfo.getData_size();
writeLock();
try {
if (job.getState() != JobState.RUNNING) {
LOG.warn("clone job state is not running. job: {}", job);
return;
}
// if clone finished and report version == last failed version then last failed version hash should equal
if (replica.getLastFailedVersion() == version && replica.getLastFailedVersionHash() != versionHash) {
throw new MetaNotFoundException(String.format("clone finished and report version %d, "
+ "version hash %d, but the replica's current failed version "
+ "is %d version hash is %d",
version, versionHash, replica.getLastFailedVersion(),
replica.getLastFailedVersionHash()));
}
replica.setState(ReplicaState.NORMAL);
replica.updateVersionInfo(version, versionHash, dataSize, rowCount);
job.setCloneFinishTimeMs(System.currentTimeMillis());
job.setState(JobState.FINISHED);
// yiguolei:
// there are two types of clone job: catch up clone or new replica add to tablet
// for new replica add to tablet, set its last failed version to max commit version for the tablet
// and the new replica will try to clone, if clone finished and the version < last failed version
// the clone type is converted to catchup clone
ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(dbId, tableId, partitionId, indexId,
tabletId, backendId, replica.getId(),
version, versionHash,
schemaHash, dataSize, rowCount,
replica.getLastFailedVersion(),
replica.getLastFailedVersionHash(),
replica.getLastSuccessVersion(),
replica.getLastSuccessVersionHash());
LOG.info("finish clone job: {}, add a replica {}", job, info);
Catalog.getInstance().getEditLog().logAddReplica(info);
} finally {
writeUnlock();
}
} catch (MetaNotFoundException e) {
cancelCloneJob(job, e.getMessage());
} finally {
db.writeUnlock();
}
}
/**
* remove finished or cancelled clone job
*/
public void removeCloneJobs() {
List<CloneJob> cancelledJobs = new ArrayList<CloneJob>();
writeLock();
try {
for (Map<Long, CloneJob> cloneJobs : priorityToCloneJobs.values()) {
Iterator<Map.Entry<Long, CloneJob>> iterator = cloneJobs.entrySet().iterator();
while (iterator.hasNext()) {
CloneJob job = iterator.next().getValue();
JobState state = job.getState();
if (state == JobState.FINISHED || state == JobState.CANCELLED) {
iterator.remove();
--jobNum;
LOG.info("remove clone job. job: {}, job num: {}", job, jobNum);
if (state == JobState.CANCELLED) {
cancelledJobs.add(job);
}
}
}
}
} finally {
writeUnlock();
}
// remove cancelled job clone state replica
if (cancelledJobs.isEmpty()) {
return;
}
for (CloneJob job : cancelledJobs) {
long dbId = job.getDbId();
long tableId = job.getTableId();
long partitionId = job.getPartitionId();
long indexId = job.getIndexId();
long tabletId = job.getTabletId();
long backendId = job.getDestBackendId();
Database db = Catalog.getInstance().getDb(dbId);
if (db == null) {
LOG.warn("db does not exist. id: {}", dbId);
return;
}
db.writeLock();
try {
OlapTable olapTable = (OlapTable) db.getTable(tableId);
if (olapTable == null) {
throw new MetaNotFoundException("table does not exist. id: " + tableId);
}
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
throw new MetaNotFoundException("partition does not exist. id: " + partitionId);
}
MaterializedIndex index = partition.getIndex(indexId);
if (index == null) {
throw new MetaNotFoundException("index does not exist. id: " + indexId);
}
Tablet tablet = index.getTablet(tabletId);
if (tablet == null) {
throw new MetaNotFoundException("tablet does not exist. id: " + tabletId);
}
Replica replica = tablet.getReplicaByBackendId(backendId);
if (replica == null) {
LOG.info("could not find replica on backend {} for tablet id {}, "
+ "maybe clone not find src backends, ignore it",
backendId, tabletId);
return;
}
// 1. if this is a normal clone job, then should remove it from meta, not write log, because the clone replica
// not exist on follower and observer
// 2. if this is a catch up clone job, should not delete it from meta because the catch up replica is a normal replica
// before clone and we will lost data if delete the catch up clone replica
if (replica.getState() == ReplicaState.CLONE) {
if (job.getType() == JobType.CATCHUP) {
replica.setState(ReplicaState.NORMAL);
} else {
if (tablet.deleteReplicaByBackendId(backendId)) {
LOG.info("remove clone replica. tablet id: {}, backend id: {}", tabletId, backendId);
}
}
}
} catch (MetaNotFoundException e) {
LOG.warn("meta not found, error: {}", e.getMessage());
} finally {
db.writeUnlock();
}
}
}
/**
* calculate clone job priority
* @return HIGH if online replica num is lower than quorum else LOW
*/
public static JobPriority calculatePriority(short onlineReplicaNum, short replicationNum) {
JobPriority priority = JobPriority.LOW;
short quorumReplicationNum = (short) (replicationNum / 2 + 1);
if (onlineReplicaNum < quorumReplicationNum) {
priority = JobPriority.NORMAL;
}
return priority;
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,174 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import org.apache.doris.task.CloneTask;
public class CloneJob {
public enum JobPriority {
HIGH,
NORMAL,
LOW
}
public enum JobState {
PENDING,
RUNNING,
FINISHED,
CANCELLED
}
public enum JobType {
SUPPLEMENT,
MIGRATION,
CATCHUP
}
private long dbId;
private long tableId;
private long partitionId;
private long indexId;
private long tabletId;
private long destBackendId;
private JobState state;
private JobType type;
private JobPriority priority;
private long createTimeMs;
private long cloneStartTimeMs;
private long cloneFinishTimeMs;
private long timeoutMs;
private String failMsg;
private CloneTask cloneTask;
public CloneJob(long dbId, long tableId, long partitionId, long indexId, long tabletId,
long destBackendId, JobType type, JobPriority priority, long timeoutMs) {
this.dbId = dbId;
this.tableId = tableId;
this.partitionId = partitionId;
this.indexId = indexId;
this.tabletId = tabletId;
this.destBackendId = destBackendId;
this.state = JobState.PENDING;
this.type = type;
this.priority = priority;
this.createTimeMs = System.currentTimeMillis();
this.cloneStartTimeMs = -1;
this.cloneFinishTimeMs = -1;
this.timeoutMs = timeoutMs;
this.failMsg = "";
}
public long getDbId() {
return dbId;
}
public long getTableId() {
return tableId;
}
public long getPartitionId() {
return partitionId;
}
public long getIndexId() {
return indexId;
}
public long getTabletId() {
return tabletId;
}
public long getDestBackendId() {
return destBackendId;
}
public JobState getState() {
return state;
}
public void setState(JobState state) {
this.state = state;
}
public JobType getType() {
return type;
}
public JobPriority getPriority() {
return priority;
}
public void setPriority(JobPriority priority) {
this.priority = priority;
}
public long getCreateTimeMs() {
return createTimeMs;
}
public void setCreateTimeMs(long createTimeMs) {
this.createTimeMs = createTimeMs;
}
public long getCloneStartTimeMs() {
return cloneStartTimeMs;
}
public void setCloneStartTimeMs(long cloneStartTimeMs) {
this.cloneStartTimeMs = cloneStartTimeMs;
}
public long getCloneFinishTimeMs() {
return cloneFinishTimeMs;
}
public void setCloneFinishTimeMs(long cloneFinishTimeMs) {
this.cloneFinishTimeMs = cloneFinishTimeMs;
}
public long getTimeoutMs() {
return timeoutMs;
}
public String getFailMsg() {
return failMsg;
}
public void setFailMsg(String failMsg) {
this.failMsg = failMsg;
}
public void setCloneTask(CloneTask task) {
this.cloneTask = task;
}
public CloneTask getCloneTask() {
return this.cloneTask;
}
@Override
public String toString() {
return "CloneJob [dbId=" + dbId + ", tableId=" + tableId + ", partitionId=" + partitionId
+ ", indexId=" + indexId + ", tabletId=" + tabletId + ", destBackendId=" + destBackendId
+ ", state=" + state + ", type=" + type + ", priority=" + priority
+ ", createTimeMs=" + createTimeMs + ", cloneStartTimeMs=" + cloneStartTimeMs
+ ", cloneFinishTimeMs=" + cloneFinishTimeMs + ", timeoutMs=" + timeoutMs + ", failMsg=" + failMsg
+ "]";
}
}

View File

@ -553,13 +553,10 @@ public class Config extends ConfigBase {
public static int query_colocate_join_memory_limit_penalty_factor = 8;
/*
* co-location join is an experimental feature now.
* Set to false if you know what it is and really want to use it.
* if set to false, 'use_new_tablet_scheduler' must be set to false, because the new TabletScheduler
* can not handle tablet repair for colocate tables.
* Deprecated after 0.10
*/
@ConfField
public static boolean disable_colocate_join = true;
public static boolean disable_colocate_join = false;
/*
* The default user resource publishing timeout.
*/
@ -755,9 +752,7 @@ public class Config extends ConfigBase {
@ConfField public static int schedule_slot_num_per_path = 2;
/*
* set to true to use the TabletScheduler instead of the old CloneChecker.
* if set to true, 'disable_colocate_join' must be set to true.
* Because the new TabeltScheduler can not handle tablet repair for colocate tables.
* Deprecated after 0.10
*/
@ConfField public static boolean use_new_tablet_scheduler = true;

View File

@ -1,67 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.proc;
import org.apache.doris.catalog.Database;
import org.apache.doris.clone.Clone;
import org.apache.doris.common.AnalysisException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
public class CloneProcNode implements ProcNodeInterface {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("DbId").add("TableId").add("PartitionId").add("IndexId")
.add("TabletId").add("BackendId").add("State").add("Type")
.add("Priority").add("CloneVersion").add("CloneVersionHash")
.add("TaskFailTimes")
.add("CreateTime").add("StartTime").add("FinishTime")
.add("Timeout(s)").add("FailMsg")
.build();
private Clone clone;
private Database db;
public CloneProcNode(Clone clone, Database db) {
this.clone = clone;
this.db = db;
}
@Override
public ProcResult fetchResult() throws AnalysisException {
Preconditions.checkNotNull(db);
Preconditions.checkNotNull(clone);
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);
List<List<Comparable>> cloneJobInfos = clone.getCloneJobInfosByDb(db);
for (List<Comparable> infoStr : cloneJobInfos) {
List<String> oneInfo = new ArrayList<String>(TITLE_NAMES.size());
for (Comparable element : infoStr) {
oneInfo.add(element.toString());
}
result.addRow(oneInfo);
}
return result;
}
}

View File

@ -21,7 +21,6 @@ import org.apache.doris.alter.RollupHandler;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.clone.CloneJob.JobState;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportMgr;
@ -43,7 +42,6 @@ public class JobsProcDir implements ProcDirInterface {
.add("Cancelled").add("Total")
.build();
private static final String CLONE = "clone";
private static final String LOAD = "load";
private static final String DELETE = "delete";
private static final String ROLLUP = "rollup";
@ -69,9 +67,7 @@ public class JobsProcDir implements ProcDirInterface {
throw new AnalysisException("Job type name is null");
}
if (jobTypeName.equals(CLONE)) {
return new CloneProcNode(catalog.getCloneInstance(), db);
} else if (jobTypeName.equals(LOAD)) {
if (jobTypeName.equals(LOAD)) {
return new LoadProcDir(catalog.getLoadInstance(), db);
} else if (jobTypeName.equals(DELETE)) {
return new DeleteInfoProcDir(catalog.getLoadInstance(), db.getId());
@ -95,29 +91,20 @@ public class JobsProcDir implements ProcDirInterface {
result.setNames(TITLE_NAMES);
long dbId = db.getId();
// clone
Integer pendingNum = Catalog.getInstance().getCloneInstance().getCloneJobNum(JobState.PENDING, dbId);
Integer runningNum = Catalog.getInstance().getCloneInstance().getCloneJobNum(JobState.RUNNING, dbId);
Integer finishedNum = Catalog.getInstance().getCloneInstance().getCloneJobNum(JobState.FINISHED, dbId);
Integer cancelledNum = Catalog.getInstance().getCloneInstance().getCloneJobNum(JobState.CANCELLED, dbId);
Integer totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(CLONE, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));
// load
Load load = Catalog.getInstance().getLoadInstance();
LoadManager loadManager = Catalog.getCurrentCatalog().getLoadManager();
pendingNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.PENDING, dbId)
Integer pendingNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.PENDING, dbId)
+ loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.PENDING, dbId);
runningNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.ETL, dbId)
Integer runningNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.ETL, dbId)
+ load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.LOADING, dbId)
+ loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.LOADING, dbId);
finishedNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.QUORUM_FINISHED, dbId)
Integer finishedNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.QUORUM_FINISHED, dbId)
+ load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.FINISHED, dbId)
+ loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.FINISHED, dbId);
cancelledNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.CANCELLED, dbId)
Integer cancelledNum = load.getLoadJobNum(org.apache.doris.load.LoadJob.JobState.CANCELLED, dbId)
+ loadManager.getLoadJobNum(org.apache.doris.load.loadv2.JobState.CANCELLED, dbId);
totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
Integer totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(LOAD, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));

View File

@ -20,10 +20,8 @@ package org.apache.doris.journal;
import org.apache.doris.alter.AlterJob;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.BackupJob;
import org.apache.doris.backup.BackupJob_D;
import org.apache.doris.backup.Repository;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.backup.RestoreJob_D;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Function;
@ -43,11 +41,9 @@ import org.apache.doris.load.LoadJob;
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.persist.BackendIdsUpdateInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.CloneInfo;
import org.apache.doris.persist.ClusterInfo;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.ConsistencyCheckInfo;
@ -195,17 +191,6 @@ public class JournalEntity implements Writable {
data = new TableInfo();
break;
}
case OperationType.OP_BACKUP_START:
case OperationType.OP_BACKUP_FINISH_SNAPSHOT:
case OperationType.OP_BACKUP_FINISH: {
data = new BackupJob_D();
break;
}
case OperationType.OP_RESTORE_START:
case OperationType.OP_RESTORE_FINISH: {
data = new RestoreJob_D();
break;
}
case OperationType.OP_BACKUP_JOB: {
data = BackupJob.read(in);
needRead = false;
@ -244,10 +229,6 @@ public class JournalEntity implements Writable {
needRead = false;
break;
}
case OperationType.OP_CLONE_DONE: {
data = new CloneInfo();
break;
}
case OperationType.OP_ADD_REPLICA:
case OperationType.OP_UPDATE_REPLICA:
case OperationType.OP_DELETE_REPLICA:
@ -272,14 +253,6 @@ public class JournalEntity implements Writable {
data = new LoadErrorHub.Param();
break;
}
case OperationType.OP_ALTER_ACCESS_RESOURCE: {
data = new UserProperty();
break;
}
case OperationType.OP_DROP_USER: {
data = new Text();
break;
}
case OperationType.OP_NEW_DROP_USER: {
data = UserIdentity.read(in);
needRead = false;
@ -325,10 +298,6 @@ public class JournalEntity implements Writable {
data = new ClusterInfo();
break;
}
case OperationType.OP_UPDATE_CLUSTER: {
data = new ClusterInfo();
break;
}
case OperationType.OP_EXPAND_CLUSTER: {
data = new ClusterInfo();
break;

View File

@ -28,8 +28,6 @@ import org.apache.doris.load.AsyncDeleteJob;
import org.apache.doris.load.DeleteInfo;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.persist.CloneInfo;
import org.apache.doris.persist.ConsistencyCheckInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.DatabaseInfo;
@ -324,11 +322,6 @@ public final class LocalJournalCursor implements JournalCursor {
ret.setData(deleteJob);
break;
}
case OperationType.OP_CLONE_DONE: {
CloneInfo info = CloneInfo.read(in);
ret.setData(info);
break;
}
case OperationType.OP_ADD_REPLICA:
case OperationType.OP_DELETE_REPLICA: {
ReplicaPersistInfo info = ReplicaPersistInfo.read(in);
@ -355,18 +348,6 @@ public final class LocalJournalCursor implements JournalCursor {
ret.setData(param);
break;
}
case OperationType.OP_ALTER_ACCESS_RESOURCE: {
UserProperty resource = new UserProperty();
resource.readFields(in);
ret.setData(resource);
break;
}
case OperationType.OP_DROP_USER: {
Text text = new Text();
text.readFields(in);
ret.setData(text);
break;
}
case OperationType.OP_MASTER_INFO_CHANGE: {
MasterInfo info = new MasterInfo();
info.readFields(in);

View File

@ -689,23 +689,13 @@ public class MasterImpl {
private void finishClone(AgentTask task, TFinishTaskRequest request) {
CloneTask cloneTask = (CloneTask) task;
if (cloneTask.getTaskVersion() == CloneTask.VERSION_1) {
if (request.getTask_status().getStatus_code() != TStatusCode.OK) {
// just return, like the old style
return;
}
List<TTabletInfo> finishTabletInfos = request.getFinish_tablet_infos();
Preconditions.checkArgument(finishTabletInfos != null && !finishTabletInfos.isEmpty());
Preconditions.checkArgument(finishTabletInfos.size() == 1);
Catalog.getInstance().getCloneInstance().finishCloneJob(cloneTask, finishTabletInfos.get(0));
} else if (cloneTask.getTaskVersion() == CloneTask.VERSION_2) {
if (cloneTask.getTaskVersion() == CloneTask.VERSION_2) {
Catalog.getCurrentCatalog().getTabletScheduler().finishCloneTask(cloneTask, request);
} else {
LOG.warn("invalid clone task, ignore it. {}", task);
}
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.CLONE, task.getSignature());
}
private void finishConsistenctCheck(AgentTask task, TFinishTaskRequest request) {

View File

@ -29,7 +29,6 @@ import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.Tablet.TabletStatus;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.clone.CloneChecker;
import org.apache.doris.clone.TabletSchedCtx;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
@ -597,12 +596,6 @@ public class ReportHandler extends Daemon {
replicas = tablet.getReplicas();
if (replicas.size() == 0) {
LOG.error("invalid situation. tablet[{}] is empty", tabletId);
} else if (replicas.size() < replicationNum) {
if (!Config.use_new_tablet_scheduler) {
CloneChecker.getInstance().checkTabletForSupplement(dbId, tableId,
partitionId,
indexId, tabletId);
}
}
}
} // end for tabletMetas

View File

@ -22,10 +22,8 @@ import org.apache.doris.alter.RollupJob;
import org.apache.doris.alter.SchemaChangeJob;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.BackupJob;
import org.apache.doris.backup.BackupJob_D;
import org.apache.doris.backup.Repository;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.backup.RestoreJob_D;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
@ -56,7 +54,6 @@ import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.UserProperty;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.system.Backend;
@ -249,17 +246,6 @@ public class EditLog {
catalog.replayRenamePartition(info);
break;
}
case OperationType.OP_BACKUP_START:
case OperationType.OP_BACKUP_FINISH_SNAPSHOT:
case OperationType.OP_BACKUP_FINISH: {
BackupJob_D job = (BackupJob_D) journal.getData();
break;
}
case OperationType.OP_RESTORE_START:
case OperationType.OP_RESTORE_FINISH: {
RestoreJob_D job = (RestoreJob_D) journal.getData();
break;
}
case OperationType.OP_BACKUP_JOB: {
BackupJob job = (BackupJob) journal.getData();
catalog.getBackupHandler().replayAddJob(job);
@ -451,16 +437,6 @@ public class EditLog {
}
break;
}
case OperationType.OP_ALTER_ACCESS_RESOURCE: {
UserProperty userProperty = (UserProperty) journal.getData();
catalog.getAuth().replayAlterAccess(userProperty);
break;
}
case OperationType.OP_DROP_USER: {
String userName = ((Text) journal.getData()).toString();
catalog.getAuth().replayOldDropUser(userName);
break;
}
case OperationType.OP_CREATE_USER: {
PrivInfo privInfo = (PrivInfo) journal.getData();
catalog.getAuth().replayCreateUser(privInfo);
@ -977,15 +953,6 @@ public class EditLog {
logEdit(OperationType.OP_BACKEND_STATE_CHANGE, be);
}
public void logAlterAccess(UserProperty userProperty) {
logEdit(OperationType.OP_ALTER_ACCESS_RESOURCE, userProperty);
}
@Deprecated
public void logDropUser(String userName) {
logEdit(OperationType.OP_DROP_USER, new Text(userName));
}
public void logCreateUser(PrivInfo info) {
logEdit(OperationType.OP_CREATE_USER, info);
}
@ -1042,26 +1009,6 @@ public class EditLog {
logEdit(OperationType.OP_RENAME_PARTITION, tableInfo);
}
public void logBackupStart(BackupJob_D backupJob) {
logEdit(OperationType.OP_BACKUP_START, backupJob);
}
public void logBackupFinishSnapshot(BackupJob_D backupJob) {
logEdit(OperationType.OP_BACKUP_FINISH_SNAPSHOT, backupJob);
}
public void logBackupFinish(BackupJob_D backupJob) {
logEdit(OperationType.OP_BACKUP_FINISH, backupJob);
}
public void logRestoreJobStart(RestoreJob_D restoreJob) {
logEdit(OperationType.OP_RESTORE_START, restoreJob);
}
public void logRestoreFinish(RestoreJob_D restoreJob) {
logEdit(OperationType.OP_RESTORE_FINISH, restoreJob);
}
public void logGlobalVariable(SessionVariable variable) {
logEdit(OperationType.OP_GLOBAL_VARIABLE, variable);
}

View File

@ -39,16 +39,6 @@ public class OperationType {
public static final short OP_RECOVER_PARTITION = 18;
public static final short OP_RENAME_TABLE = 19;
public static final short OP_RENAME_PARTITION = 110;
@Deprecated
public static final short OP_BACKUP_START = 111;
@Deprecated
public static final short OP_BACKUP_FINISH_SNAPSHOT = 112;
@Deprecated
public static final short OP_BACKUP_FINISH = 113;
@Deprecated
public static final short OP_RESTORE_START = 114;
@Deprecated
public static final short OP_RESTORE_FINISH = 115;
public static final short OP_BACKUP_JOB = 116;
public static final short OP_RESTORE_JOB = 117;
public static final short OP_TRUNCATE_TABLE = 118;
@ -77,11 +67,6 @@ public class OperationType {
public static final short OP_EXPORT_UPDATE_STATE = 37;
public static final short OP_FINISH_SYNC_DELETE = 40;
@Deprecated
// (cmy 2015-07-22)
// do not use it anymore,use OP_ADD_REPLICA and OP_DELETE_REPLICA instead.
// remove later
public static final short OP_CLONE_DONE = 41;
public static final short OP_ADD_REPLICA = 42;
public static final short OP_DELETE_REPLICA = 43;
public static final short OP_FINISH_ASYNC_DELETE = 44;
@ -98,9 +83,6 @@ public class OperationType {
public static final short OP_REMOVE_FRONTEND = 57;
public static final short OP_SET_LOAD_ERROR_HUB = 58;
public static final short OP_HEARTBEAT = 59;
public static final short OP_ALTER_ACCESS_RESOURCE = 60;
@Deprecated
public static final short OP_DROP_USER = 61;
public static final short OP_CREATE_USER = 62;
public static final short OP_NEW_DROP_USER = 63;
public static final short OP_GRANT_PRIV = 64;
@ -123,8 +105,6 @@ public class OperationType {
public static final short OP_LINK_CLUSTER = 78;
public static final short OP_ENTER_CLUSTER = 79;
public static final short OP_SHOW_CLUSTERS = 80;
@Deprecated
public static final short OP_UPDATE_CLUSTER = 81;
public static final short OP_UPDATE_DB = 82;
public static final short OP_DROP_LINKDB = 83;

View File

@ -1,296 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.clone.CloneJob.JobPriority;
import org.apache.doris.clone.CloneJob.JobState;
import org.apache.doris.clone.CloneJob.JobType;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.UnitTestUtil;
import org.apache.doris.persist.EditLog;
import org.apache.doris.task.CloneTask;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TTabletInfo;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
import java.util.List;
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({ "org.apache.log4j.*", "javax.management.*" })
@PrepareForTest({ Catalog.class })
public class CloneTest {
private Clone clone;
private long dbId;
private long tableId;
private long partitionId;
private long indexId;
@Before
public void setUp() {
clone = new Clone();
dbId = 0L;
tableId = 0L;
partitionId = 0L;
indexId = 0L;
Catalog.getCurrentInvertedIndex().clear();
}
@Test
public void testAddCloneJob() {
// check job num
Assert.assertEquals(0, clone.getJobNum());
// add tablet0 clone job
long tabletId = 0L;
long backendId = 0L;
JobType type = JobType.SUPPLEMENT;
JobPriority priority = JobPriority.LOW;
long timeoutSecond = 3600L;
Assert.assertTrue(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority, timeoutSecond));
// add same tablet clone job
Assert.assertFalse(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority,
timeoutSecond));
// check job num
Assert.assertEquals(1, clone.getJobNum());
// check job state and priority
List<CloneJob> pendingJobs = clone.getCloneJobs(JobState.PENDING);
Assert.assertEquals(1, pendingJobs.size());
CloneJob job = pendingJobs.get(0);
Assert.assertEquals(JobState.PENDING, job.getState());
Assert.assertEquals(JobPriority.LOW, job.getPriority());
// change job priority
priority = JobPriority.NORMAL;
Assert.assertFalse(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority,
timeoutSecond));
pendingJobs = clone.getCloneJobs(JobState.PENDING);
Assert.assertEquals(1, pendingJobs.size());
job = pendingJobs.get(0);
Assert.assertEquals(JobState.PENDING, job.getState());
Assert.assertEquals(JobPriority.NORMAL, job.getPriority());
// test job num threshold
Config.clone_max_job_num = 2;
// add tablet1 clone job
tabletId = 1L;
Assert.assertTrue(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority,
timeoutSecond));
// add tablet2 low priority clone job error
tabletId = 2L;
priority = JobPriority.LOW;
Assert.assertFalse(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority,
timeoutSecond));
// add tablet2 high priority clone job success
priority = JobPriority.NORMAL;
Assert.assertFalse(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority, timeoutSecond));
}
@Test
public void testCheckTimeout() {
// add two clone job
long tabletId = 0L;
long backendId = 0L;
JobType type = JobType.SUPPLEMENT;
JobPriority priority = JobPriority.LOW;
long timeoutSecond = 3600L;
Assert.assertTrue(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority, timeoutSecond));
Assert.assertTrue(clone.getCloneTabletIds().contains(tabletId));
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1, TStorageMedium.HDD);
Catalog.getCurrentInvertedIndex().addTablet(tabletId, tabletMeta);
Replica replica = new Replica();
Catalog.getCurrentInvertedIndex().addReplica(tabletId, replica);
tabletId = 1L;
Assert.assertTrue(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority, timeoutSecond));
Assert.assertTrue(clone.getCloneTabletIds().contains(tabletId));
Assert.assertEquals(2, clone.getJobNum());
Replica replica2 = new Replica();
Catalog.getCurrentInvertedIndex().addTablet(tabletId, tabletMeta);
Catalog.getCurrentInvertedIndex().addReplica(tabletId, replica2);
// change tablet0 creationTime
List<CloneJob> pendingJobs = clone.getCloneJobs(JobState.PENDING);
for (CloneJob job : pendingJobs) {
if (job.getTabletId() == 0L) {
job.setCreateTimeMs(job.getCreateTimeMs() - 3600 * 1000L - 1L);
}
}
// check timeout
clone.checkTimeout();
Assert.assertEquals(2, clone.getJobNum());
// remove cancelled clone job
clone.removeCloneJobs();
pendingJobs = clone.getCloneJobs(JobState.PENDING);
Assert.assertEquals(1, pendingJobs.size());
CloneJob job = pendingJobs.get(0);
Assert.assertEquals(1L, job.getTabletId());
}
@Test
public void testCancelCloneJob() {
// add tablet0 clone job
long tabletId = 0L;
long backendId = 0L;
JobType type = JobType.SUPPLEMENT;
JobPriority priority = JobPriority.LOW;
long timeoutSecond = 3600L;
Assert.assertTrue(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority, timeoutSecond));
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1, TStorageMedium.HDD);
Catalog.getCurrentInvertedIndex().addTablet(tabletId, tabletMeta);
Replica replica = new Replica();
Catalog.getCurrentInvertedIndex().addReplica(tabletId, replica);
// cancel clone job
List<CloneJob> pendingJobs = clone.getCloneJobs(JobState.PENDING);
CloneJob job = pendingJobs.get(0);
job.setState(JobState.RUNNING);
clone.cancelCloneJob(job, "timeout");
Assert.assertEquals(1, clone.getJobNum());
List<CloneJob> cancelledJobs = clone.getCloneJobs(JobState.CANCELLED);
Assert.assertEquals(1, cancelledJobs.size());
Assert.assertEquals("timeout", cancelledJobs.get(0).getFailMsg());
// remove cancelled clone jobs
clone.removeCloneJobs();
Assert.assertEquals(0, clone.getJobNum());
}
@Test
public void testFinishCloneJob() {
// add tablet0 clone job
long tabletId = 0L;
long backendId = 0L;
long version = 1L;
long versionHash = 0L;
int schemaHash = UnitTestUtil.SCHEMA_HASH;
JobType type = JobType.SUPPLEMENT;
JobPriority priority = JobPriority.LOW;
long timeoutSecond = 3600L;
Assert.assertTrue(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority, timeoutSecond));
Assert.assertEquals(1, clone.getJobNum());
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, 1, TStorageMedium.HDD);
Catalog.getCurrentInvertedIndex().addTablet(tabletId, tabletMeta);
Replica replica = new Replica();
Catalog.getCurrentInvertedIndex().addReplica(tabletId, replica);
// finish clone job: db does not exist
List<CloneJob> pendingJobs = clone.getCloneJobs(JobState.PENDING);
CloneJob job = pendingJobs.get(0);
job.setState(JobState.RUNNING);
CloneTask task = new CloneTask(backendId, dbId, tableId, partitionId, indexId, tabletId,
schemaHash, new ArrayList<TBackend>(), TStorageMedium.HDD,
version, versionHash);
TTabletInfo tabletInfo = new TTabletInfo(tabletId, schemaHash, version, versionHash, 0L, 0L);
clone.finishCloneJob(task, tabletInfo);
Assert.assertEquals(1, clone.getJobNum());
List<CloneJob> cancelledJobs = clone.getCloneJobs(JobState.CANCELLED);
Assert.assertEquals(1, cancelledJobs.size());
// remove cancelled clone job
clone.removeCloneJobs();
Assert.assertEquals(0, clone.getJobNum());
// add tablet0 clone job again
Assert.assertTrue(clone.addCloneJob(dbId, tableId, partitionId, indexId, tabletId, backendId,
type, priority,
timeoutSecond));
Assert.assertEquals(1, clone.getJobNum());
pendingJobs = clone.getCloneJobs(JobState.PENDING);
job = pendingJobs.get(0);
job.setState(JobState.RUNNING);
// finish clone job success
Database db = UnitTestUtil.createDb(dbId, tableId, partitionId, indexId, tabletId, backendId,
version, versionHash);
OlapTable table = (OlapTable) db.getTable(tableId);
Partition partition = table.getPartition(partitionId);
MaterializedIndex index = partition.getBaseIndex();
Tablet tablet = index.getTablet(tabletId);
Replica replica2 = tablet.getReplicaByBackendId(backendId);
replica2.setState(ReplicaState.CLONE);
Catalog catalog = EasyMock.createMock(Catalog.class);
EasyMock.expect(catalog.getDb(EasyMock.anyLong())).andReturn(db).anyTimes();
EditLog editLog = EasyMock.createMock(EditLog.class);
EasyMock.expect(catalog.getEditLog()).andReturn(editLog).anyTimes();
EasyMock.replay(catalog);
PowerMock.mockStatic(Catalog.class);
EasyMock.expect(Catalog.getInstance()).andReturn(catalog).anyTimes();
PowerMock.replay(Catalog.class);
clone.finishCloneJob(task, tabletInfo);
Assert.assertEquals(1, clone.getJobNum());
List<CloneJob> finishedJobs = clone.getCloneJobs(JobState.FINISHED);
Assert.assertEquals(1, finishedJobs.size());
Assert.assertEquals(ReplicaState.NORMAL, replica2.getState());
}
@Test
public void testCalculatePriority() {
short onlineReplicaNum = 2;
short replicationNum = 3;
Assert.assertEquals(JobPriority.LOW, Clone.calculatePriority(onlineReplicaNum, replicationNum));
onlineReplicaNum = 2;
replicationNum = 4;
Assert.assertEquals(JobPriority.NORMAL, Clone.calculatePriority(onlineReplicaNum, replicationNum));
onlineReplicaNum = 1;
replicationNum = 2;
Assert.assertEquals(JobPriority.NORMAL, Clone.calculatePriority(onlineReplicaNum, replicationNum));
}
}