[Feature] Support recording custom number of backup and restore task information (#5947)

* Record all backup jobs and support where clause
This commit is contained in:
Hao Tan
2021-06-22 09:19:54 +08:00
committed by GitHub
parent abcd56c6c8
commit b9ad34736d
8 changed files with 287 additions and 52 deletions

View File

@ -2509,9 +2509,9 @@ show_param ::=
{:
RESULT = new ShowUserPropertyStmt(user, parser.wild);
:}
| KW_BACKUP opt_db:db
| KW_BACKUP opt_db:db opt_wild_where
{:
RESULT = new ShowBackupStmt(db);
RESULT = new ShowBackupStmt(db, parser.where);
:}
| KW_RESTORE opt_db:db opt_wild_where
{:

View File

@ -21,8 +21,11 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@ -31,6 +34,8 @@ import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.util.function.Predicate;
public class ShowBackupStmt extends ShowStmt {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("JobId").add("SnapshotName").add("DbName").add("State").add("BackupObjs").add("CreateTime")
@ -39,9 +44,13 @@ public class ShowBackupStmt extends ShowStmt {
.build();
private String dbName;
private final Expr where;
private boolean isAccurateMatch;
private String labelValue;
public ShowBackupStmt(String dbName) {
public ShowBackupStmt(String dbName, Expr where) {
this.dbName = dbName;
this.where = where;
}
public String getDbName() {
@ -65,6 +74,56 @@ public class ShowBackupStmt extends ShowStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED,
ConnectContext.get().getQualifiedUser(), dbName);
}
if (where == null) {
return;
}
boolean valid = analyzeWhereClause();
if (!valid) {
throw new AnalysisException("Where clause should like: LABEL = \"your_label_name\", "
+ " or LABEL LIKE \"matcher\"");
}
}
private boolean analyzeWhereClause() {
if (!(where instanceof LikePredicate) && !(where instanceof BinaryPredicate)) {
return false;
}
if (where instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) where;
if (BinaryPredicate.Operator.EQ != binaryPredicate.getOp()) {
return false;
}
isAccurateMatch = true;
}
if (where instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) where;
if (LikePredicate.Operator.LIKE != likePredicate.getOp()) {
return false;
}
}
// left child
if (!(where.getChild(0) instanceof SlotRef)) {
return false;
}
String leftKey = ((SlotRef) where.getChild(0)).getColumnName();
if (!"label".equalsIgnoreCase(leftKey)) {
return false;
}
// right child
if (!(where.getChild(1) instanceof StringLiteral)) {
return false;
}
labelValue = ((StringLiteral) where.getChild(1)).getStringValue();
if (Strings.isNullOrEmpty(labelValue)) {
return false;
}
return true;
}
@Override
@ -84,6 +143,10 @@ public class ShowBackupStmt extends ShowStmt {
builder.append(" FROM `").append(dbName).append("` ");
}
if (where != null) {
builder.append(where.toSql());
}
return builder.toString();
}
@ -96,4 +159,28 @@ public class ShowBackupStmt extends ShowStmt {
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
public boolean isAccurateMatch() {
return isAccurateMatch;
}
public String getLabelValue() {
return labelValue;
}
public Expr getWhere() {
return where;
}
public Predicate<String> getLabelPredicate() throws AnalysisException {
if (null == where) {
return label -> true;
}
if (isAccurateMatch) {
return CaseSensibility.LABEL.getCaseSensibility() ? label -> label.equals(labelValue) : label -> label.equalsIgnoreCase(labelValue);
} else {
PatternMatcher patternMatcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
return patternMatcher::match;
}
}
}

View File

@ -21,8 +21,11 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@ -31,6 +34,8 @@ import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.util.function.Predicate;
public class ShowRestoreStmt extends ShowStmt {
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("JobId").add("Label").add("Timestamp").add("DbName").add("State")
@ -42,7 +47,8 @@ public class ShowRestoreStmt extends ShowStmt {
private String dbName;
private Expr where;
private String label;
private String labelValue;
private boolean isAccurateMatch;
public ShowRestoreStmt(String dbName, Expr where) {
this.dbName = dbName;
@ -53,8 +59,8 @@ public class ShowRestoreStmt extends ShowStmt {
return dbName;
}
public String getLabel() {
return label;
public String getLabelValue() {
return labelValue;
}
@Override
@ -74,6 +80,56 @@ public class ShowRestoreStmt extends ShowStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_DB_ACCESS_DENIED,
ConnectContext.get().getQualifiedUser(), dbName);
}
if (where == null) {
return;
}
boolean valid = analyzeWhereClause();
if (!valid) {
throw new AnalysisException("Where clause should like: LABEL = \"your_label_name\", "
+ " or LABEL LIKE \"matcher\"");
}
}
private boolean analyzeWhereClause() {
if (!(where instanceof LikePredicate) && !(where instanceof BinaryPredicate)) {
return false;
}
if (where instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) where;
if (BinaryPredicate.Operator.EQ != binaryPredicate.getOp()) {
return false;
}
isAccurateMatch = true;
}
if (where instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) where;
if (LikePredicate.Operator.LIKE != likePredicate.getOp()) {
return false;
}
}
// left child
if (!(where.getChild(0) instanceof SlotRef)) {
return false;
}
String leftKey = ((SlotRef) where.getChild(0)).getColumnName();
if (!"label".equalsIgnoreCase(leftKey)) {
return false;
}
// right child
if (!(where.getChild(1) instanceof StringLiteral)) {
return false;
}
labelValue = ((StringLiteral) where.getChild(1)).getStringValue();
if (Strings.isNullOrEmpty(labelValue)) {
return false;
}
return true;
}
@Override
@ -106,5 +162,25 @@ public class ShowRestoreStmt extends ShowStmt {
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
public boolean isAccurateMatch() {
return isAccurateMatch;
}
public Expr getWhere() {
return where;
}
public Predicate<String> getLabelPredicate() throws AnalysisException {
if (null == where) {
return label -> true;
}
if (isAccurateMatch) {
return CaseSensibility.LABEL.getCaseSensibility() ? label -> label.equals(labelValue) : label -> label.equalsIgnoreCase(labelValue);
} else {
PatternMatcher patternMatcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
return patternMatcher::match;
}
}
}

View File

@ -55,9 +55,9 @@ import org.apache.doris.thrift.TTaskType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -67,11 +67,16 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class BackupHandler extends MasterDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(BackupHandler.class);
@ -82,13 +87,14 @@ public class BackupHandler extends MasterDaemon implements Writable {
private RepositoryMgr repoMgr = new RepositoryMgr();
// db id -> last running or finished backup/restore jobs
// We only save the last backup/restore job of a database.
// this lock is used for updating dbIdToBackupOrRestoreJobs
private final ReentrantLock jobLock = new ReentrantLock();
// db id -> last 10(max_backup_restore_job_num_per_db) backup/restore jobs
// Newly submitted job will replace the current job, only if current job is finished or cancelled.
// If the last job is finished, user can get the job info from repository. If the last job is cancelled,
// user can get the error message before submitting the next one.
// Use ConcurrentMap to get rid of locks.
private Map<Long, AbstractJob> dbIdToBackupOrRestoreJob = Maps.newConcurrentMap();
private final Map<Long, Deque<AbstractJob>> dbIdToBackupOrRestoreJobs = new HashMap<>();
// this lock is used for handling one backup or restore request at a time.
private ReentrantLock seqlock = new ReentrantLock();
@ -154,7 +160,19 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
public AbstractJob getJob(long dbId) {
return dbIdToBackupOrRestoreJob.get(dbId);
return getCurrentJob(dbId);
}
public List<AbstractJob> getJobs(long dbId, Predicate<String> predicate) {
jobLock.lock();
try {
return dbIdToBackupOrRestoreJobs.getOrDefault(dbId, new LinkedList<>())
.stream()
.filter(e -> predicate.test(e.getLabel()))
.collect(Collectors.toList());
} finally {
jobLock.unlock();
}
}
@Override
@ -165,7 +183,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
}
for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) {
for (AbstractJob job : getAllCurrentJobs()) {
job.setCatalog(catalog);
job.run();
}
@ -197,8 +215,8 @@ public class BackupHandler extends MasterDaemon implements Writable {
if (repo == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Repository does not exist");
}
for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) {
for (AbstractJob job : getAllCurrentJobs()) {
if (!job.isDone() && job.getRepoId() == repo.getId()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Backup or restore job is running on this repository."
@ -239,7 +257,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
tryLock();
try {
// Check if there is backup or restore job running on this database
AbstractJob currentJob = dbIdToBackupOrRestoreJob.get(db.getId());
AbstractJob currentJob = getCurrentJob(db.getId());
if (currentJob != null && !currentJob.isDone()) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
"Can only run one backup or restore job of a database at same time");
@ -364,7 +382,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
catalog.getEditLog().logBackupJob(backupJob);
// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
dbIdToBackupOrRestoreJob.put(db.getId(), backupJob);
addBackupOrRestoreJob(db.getId(), backupJob);
LOG.info("finished to submit backup job: {}", backupJob);
}
@ -392,11 +410,51 @@ public class BackupHandler extends MasterDaemon implements Writable {
catalog.getEditLog().logRestoreJob(restoreJob);
// must put to dbIdToBackupOrRestoreJob after edit log, otherwise the state of job may be changed.
dbIdToBackupOrRestoreJob.put(db.getId(), restoreJob);
addBackupOrRestoreJob(db.getId(), restoreJob);
LOG.info("finished to submit restore job: {}", restoreJob);
}
private void addBackupOrRestoreJob(long dbId, AbstractJob job) {
jobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.computeIfAbsent(dbId, k -> Lists.newLinkedList());
while (jobs.size() >= Config.max_backup_restore_job_num_per_db) {
jobs.removeFirst();
}
AbstractJob lastJob = jobs.peekLast();
// Remove duplicate jobs and keep only the latest status
// Otherwise, the tasks that have been successfully executed will be repeated when replaying edit log.
if (lastJob != null && (lastJob.isPending() || lastJob.getJobId() == job.getJobId())) {
jobs.removeLast();
}
jobs.addLast(job);
} finally {
jobLock.unlock();
}
}
private List<AbstractJob> getAllCurrentJobs() {
jobLock.lock();
try {
return dbIdToBackupOrRestoreJobs.values().stream().filter(CollectionUtils::isNotEmpty)
.map(Deque::getLast).collect(Collectors.toList());
} finally {
jobLock.unlock();
}
}
private AbstractJob getCurrentJob(long dbId) {
jobLock.lock();
try {
Deque<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.getOrDefault(dbId, Lists.newLinkedList());
return jobs.isEmpty() ? null : jobs.getLast();
} finally {
jobLock.unlock();
}
}
private void checkAndFilterRestoreObjsExistInSnapshot(BackupJobInfo jobInfo,
AbstractBackupTableRefClause backupTableRefClause)
throws DdlException {
@ -490,8 +548,8 @@ public class BackupHandler extends MasterDaemon implements Writable {
if (db == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
}
AbstractJob job = dbIdToBackupOrRestoreJob.get(db.getId());
AbstractJob job = getCurrentJob(db.getId());
if (job == null || (job instanceof BackupJob && stmt.isRestore())
|| (job instanceof RestoreJob && !stmt.isRestore())) {
ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "No "
@ -508,7 +566,8 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
public boolean handleFinishedSnapshotTask(SnapshotTask task, TFinishTaskRequest request) {
AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId());
AbstractJob job = getCurrentJob(task.getDbId());
if (job == null) {
LOG.warn("failed to find backup or restore job for task: {}", task);
// return true to remove this task from AgentTaskQueue
@ -533,7 +592,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
public boolean handleFinishedSnapshotUploadTask(UploadTask task, TFinishTaskRequest request) {
AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId());
AbstractJob job = getCurrentJob(task.getDbId());
if (job == null || (job instanceof RestoreJob)) {
LOG.info("invalid upload task: {}, no backup job is found. db id: {}", task, task.getDbId());
return false;
@ -548,8 +607,8 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
public boolean handleDownloadSnapshotTask(DownloadTask task, TFinishTaskRequest request) {
AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId());
if (job == null || !(job instanceof RestoreJob)) {
AbstractJob job = getCurrentJob(task.getDbId());
if (!(job instanceof RestoreJob)) {
LOG.warn("failed to find restore job for task: {}", task);
// return true to remove this task from AgentTaskQueue
return true;
@ -559,8 +618,8 @@ public class BackupHandler extends MasterDaemon implements Writable {
}
public boolean handleDirMoveTask(DirMoveTask task, TFinishTaskRequest request) {
AbstractJob job = dbIdToBackupOrRestoreJob.get(task.getDbId());
if (job == null || !(job instanceof RestoreJob)) {
AbstractJob job = getCurrentJob(task.getDbId());
if (!(job instanceof RestoreJob)) {
LOG.warn("failed to find restore job for task: {}", task);
// return true to remove this task from AgentTaskQueue
return true;
@ -571,16 +630,16 @@ public class BackupHandler extends MasterDaemon implements Writable {
public void replayAddJob(AbstractJob job) {
if (job.isCancelled()) {
AbstractJob existingJob = dbIdToBackupOrRestoreJob.get(job.getDbId());
AbstractJob existingJob = getCurrentJob(job.getDbId());
if (existingJob == null || existingJob.isDone()) {
LOG.error("invalid existing job: {}. current replay job is: {}",
existingJob, job);
existingJob, job);
return;
}
existingJob.setCatalog(catalog);
existingJob.replayCancel();
} else if (!job.isPending()) {
AbstractJob existingJob = dbIdToBackupOrRestoreJob.get(job.getDbId());
AbstractJob existingJob = getCurrentJob(job.getDbId());
if (existingJob == null || existingJob.isDone()) {
LOG.error("invalid existing job: {}. current replay job is: {}",
existingJob, job);
@ -591,11 +650,12 @@ public class BackupHandler extends MasterDaemon implements Writable {
// for example: In restore job, PENDING will transfer to SNAPSHOTING, not DOWNLOAD.
job.replayRun();
}
dbIdToBackupOrRestoreJob.put(job.getDbId(), job);
addBackupOrRestoreJob(job.getDbId(), job);
}
public boolean report(TTaskType type, long jobId, long taskId, int finishedNum, int totalNum) {
for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) {
for (AbstractJob job : getAllCurrentJobs()) {
if (job.getType() == JobType.BACKUP) {
if (!job.isDone() && job.getJobId() == jobId && type == TTaskType.UPLOAD) {
job.taskProgress.put(taskId, Pair.create(finishedNum, totalNum));
@ -621,8 +681,9 @@ public class BackupHandler extends MasterDaemon implements Writable {
public void write(DataOutput out) throws IOException {
repoMgr.write(out);
out.writeInt(dbIdToBackupOrRestoreJob.size());
for (AbstractJob job : dbIdToBackupOrRestoreJob.values()) {
List<AbstractJob> jobs = dbIdToBackupOrRestoreJobs.values().stream().flatMap(Deque::stream).collect(Collectors.toList());
out.writeInt(jobs.size());
for (AbstractJob job : jobs) {
job.write(out);
}
}
@ -633,7 +694,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
int size = in.readInt();
for (int i = 0; i < size; i++) {
AbstractJob job = AbstractJob.read(in);
dbIdToBackupOrRestoreJob.put(job.getDbId(), job);
addBackupOrRestoreJob(job.getDbId(), job);
}
}
}

View File

@ -1402,4 +1402,10 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_dynamic_partition_num = 500;
/*
* Control the max num of backup/restore job per db
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_backup_restore_job_num_per_db = 10;
}

View File

@ -1614,16 +1614,13 @@ public class ShowExecutor {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName());
}
AbstractJob jobI = Catalog.getCurrentCatalog().getBackupHandler().getJob(db.getId());
if (!(jobI instanceof BackupJob)) {
resultSet = new ShowResultSet(showStmt.getMetaData(), EMPTY_SET);
return;
}
List<AbstractJob> jobs = Catalog.getCurrentCatalog().getBackupHandler().getJobs(db.getId(), showStmt.getLabelPredicate());
List<BackupJob> backupJobs = jobs.stream().filter(job -> job instanceof BackupJob)
.map(job -> (BackupJob) job).collect(Collectors.toList());
List<List<String>> infos = backupJobs.stream().map(BackupJob::getInfo).collect(Collectors.toList());
BackupJob backupJob = (BackupJob) jobI;
List<String> info = backupJob.getInfo();
List<List<String>> infos = Lists.newArrayList();
infos.add(info);
resultSet = new ShowResultSet(showStmt.getMetaData(), infos);
}
@ -1634,16 +1631,13 @@ public class ShowExecutor {
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName());
}
AbstractJob jobI = Catalog.getCurrentCatalog().getBackupHandler().getJob(db.getId());
if (!(jobI instanceof RestoreJob)) {
resultSet = new ShowResultSet(showStmt.getMetaData(), EMPTY_SET);
return;
}
List<AbstractJob> jobs = Catalog.getCurrentCatalog().getBackupHandler().getJobs(db.getId(), showStmt.getLabelPredicate());
List<RestoreJob> restoreJobs = jobs.stream().filter(job -> job instanceof RestoreJob)
.map(job -> (RestoreJob) job).collect(Collectors.toList());
List<List<String>> infos = restoreJobs.stream().map(RestoreJob::getInfo).collect(Collectors.toList());
RestoreJob restoreJob = (RestoreJob) jobI;
List<String> info = restoreJob.getInfo();
List<List<String>> infos = Lists.newArrayList();
infos.add(info);
resultSet = new ShowResultSet(showStmt.getMetaData(), infos);
}