[improve](backup) Add the commit seq at the backup job level. #44049 (#44110)

cherry pick from #44049
This commit is contained in:
walter
2024-11-18 15:00:08 +08:00
committed by GitHub
parent 8da1e8c084
commit 6c796af81b
8 changed files with 85 additions and 23 deletions

View File

@ -51,6 +51,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;
@ -356,20 +357,32 @@ public class BackupHandler extends MasterDaemon implements Writable {
+ " is read only");
}
// Determine the tables to be backed up
long commitSeq = 0;
Set<String> tableNames = Sets.newHashSet();
AbstractBackupTableRefClause abstractBackupTableRefClause = stmt.getAbstractBackupTableRefClause();
if (abstractBackupTableRefClause == null) {
tableNames = db.getTableNamesWithLock();
} else if (abstractBackupTableRefClause.isExclude()) {
tableNames = db.getTableNamesWithLock();
for (TableRef tableRef : abstractBackupTableRefClause.getTableRefList()) {
if (!tableNames.remove(tableRef.getName().getTbl())) {
LOG.info("exclude table " + tableRef.getName().getTbl()
+ " of backup stmt is not exists in db " + db.getFullName());
// Obtain the snapshot commit seq, any creating table binlog will be visible.
db.readLock();
try {
BarrierLog log = new BarrierLog(db.getId(), db.getFullName());
commitSeq = env.getEditLog().logBarrier(log);
// Determine the tables to be backed up
if (abstractBackupTableRefClause == null) {
tableNames = db.getTableNames();
} else if (abstractBackupTableRefClause.isExclude()) {
tableNames = db.getTableNames();
for (TableRef tableRef : abstractBackupTableRefClause.getTableRefList()) {
if (!tableNames.remove(tableRef.getName().getTbl())) {
LOG.info("exclude table " + tableRef.getName().getTbl()
+ " of backup stmt is not exists in db " + db.getFullName());
}
}
}
} finally {
db.readUnlock();
}
List<TableRef> tblRefs = Lists.newArrayList();
if (abstractBackupTableRefClause != null && !abstractBackupTableRefClause.isExclude()) {
tblRefs = abstractBackupTableRefClause.getTableRefList();
@ -387,6 +400,14 @@ public class BackupHandler extends MasterDaemon implements Writable {
for (TableRef tblRef : tblRefs) {
String tblName = tblRef.getName().getTbl();
Table tbl = db.getTableOrDdlException(tblName);
// filter the table types which are not supported by local backup.
if (repository == null && tbl.getType() != TableType.OLAP
&& tbl.getType() != TableType.VIEW && tbl.getType() != TableType.MATERIALIZED_VIEW) {
tblRefsNotSupport.add(tblRef);
continue;
}
if (tbl.getType() == TableType.VIEW || tbl.getType() == TableType.ODBC
|| tbl.getType() == TableType.MATERIALIZED_VIEW) {
continue;
@ -434,7 +455,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
tblRefs.removeAll(tblRefsNotSupport);
// Check if label already be used
long repoId = -1;
long repoId = Repository.KEEP_ON_LOCAL_REPO_ID;
if (repository != null) {
List<String> existSnapshotNames = Lists.newArrayList();
Status st = repository.listSnapshots(existSnapshotNames);
@ -456,7 +477,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
// Create a backup job
BackupJob backupJob = new BackupJob(stmt.getLabel(), db.getId(),
ClusterNamespace.getNameFromFullName(db.getFullName()),
tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId);
tblRefs, stmt.getTimeoutMs(), stmt.getContent(), env, repoId, commitSeq);
// write log
env.getEditLog().logBackupJob(backupJob);

View File

@ -36,6 +36,7 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.BarrierLog;
@ -85,6 +86,7 @@ import java.util.zip.GZIPOutputStream;
public class BackupJob extends AbstractJob {
private static final Logger LOG = LogManager.getLogger(BackupJob.class);
private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:";
private static final String SNAPSHOT_COMMIT_SEQ = "commit_seq";
public enum BackupJobState {
PENDING, // Job is newly created. Send snapshot tasks and save copied meta info, then transfer to SNAPSHOTING
@ -123,6 +125,8 @@ public class BackupJob extends AbstractJob {
// backup properties && table commit seq with table id
private Map<String, String> properties = Maps.newHashMap();
private long commitSeq = 0;
public BackupJob() {
super(JobType.BACKUP);
}
@ -133,11 +137,13 @@ public class BackupJob extends AbstractJob {
}
public BackupJob(String label, long dbId, String dbName, List<TableRef> tableRefs, long timeoutMs,
BackupContent content, Env env, long repoId) {
BackupContent content, Env env, long repoId, long commitSeq) {
super(JobType.BACKUP, label, dbId, dbName, timeoutMs, env, repoId);
this.tableRefs = tableRefs;
this.state = BackupJobState.PENDING;
this.commitSeq = commitSeq;
properties.put(BackupStmt.PROP_CONTENT, content.name());
properties.put(SNAPSHOT_COMMIT_SEQ, String.valueOf(commitSeq));
}
public BackupJobState getState() {
@ -237,7 +243,7 @@ public class BackupJob extends AbstractJob {
if (request.getTaskStatus().getStatusCode() == TStatusCode.TABLET_MISSING
&& !tryNewTabletSnapshotTask(task)) {
status = new Status(ErrCode.NOT_FOUND,
"make snapshot failed, failed to ge tablet, table will be droped or truncated");
"make snapshot failed, failed to ge tablet, table will be dropped or truncated");
cancelInternal();
}
@ -407,6 +413,14 @@ public class BackupJob extends AbstractJob {
LOG.debug("run backup job: {}", this);
}
if (state == BackupJobState.PENDING) {
String pausedLabel = DebugPointUtil.getDebugParamOrDefault("FE.PAUSE_PENDING_BACKUP_JOB", "");
if (!pausedLabel.isEmpty() && label.startsWith(pausedLabel)) {
LOG.info("pause pending backup job by debug point: {}", this);
return;
}
}
// run job base on current state
switch (state) {
case PENDING:
@ -569,7 +583,7 @@ public class BackupJob extends AbstractJob {
private Status prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable olapTable,
TableRef backupTableRef, AgentBatchTask batchTask) {
// Add barrier editolog for barrier commit seq
// Add barrier editlog for barrier commit seq
long dbId = db.getId();
String dbName = db.getFullName();
long tableId = olapTable.getId();
@ -703,13 +717,11 @@ public class BackupJob extends AbstractJob {
private void waitingAllSnapshotsFinished() {
if (unfinishedTaskIds.isEmpty()) {
if (env.getEditLog().exceedMaxJournalSize(this)) {
status = new Status(ErrCode.COMMON_ERROR, "backupJob is too large ");
return;
}
snapshotFinishedTime = System.currentTimeMillis();
state = BackupJobState.UPLOAD_SNAPSHOT;
@ -1026,6 +1038,10 @@ public class BackupJob extends AbstractJob {
return repoId == Repository.KEEP_ON_LOCAL_REPO_ID;
}
public long getCommitSeq() {
return commitSeq;
}
// read meta and job info bytes from disk, and return the snapshot
public synchronized Snapshot getSnapshot() {
if (state != BackupJobState.FINISHED || repoId != Repository.KEEP_ON_LOCAL_REPO_ID) {
@ -1035,7 +1051,7 @@ public class BackupJob extends AbstractJob {
// Avoid loading expired meta.
long expiredAt = createTime + timeoutMs;
if (System.currentTimeMillis() >= expiredAt) {
return new Snapshot(label, new byte[0], new byte[0], expiredAt);
return new Snapshot(label, new byte[0], new byte[0], expiredAt, commitSeq);
}
try {
@ -1043,7 +1059,7 @@ public class BackupJob extends AbstractJob {
File jobInfoFile = new File(localJobInfoFilePath);
byte[] metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
byte[] jobInfoBytes = Files.readAllBytes(jobInfoFile.toPath());
return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt);
return new Snapshot(label, metaInfoBytes, jobInfoBytes, expiredAt, commitSeq);
} catch (IOException e) {
LOG.warn("failed to load meta info and job info file, meta info file {}, job info file {}: ",
localMetaInfoFilePath, localJobInfoFilePath, e);
@ -1236,6 +1252,10 @@ public class BackupJob extends AbstractJob {
String value = Text.readString(in);
properties.put(key, value);
}
if (properties.containsKey(SNAPSHOT_COMMIT_SEQ)) {
commitSeq = Long.parseLong(properties.get(SNAPSHOT_COMMIT_SEQ));
}
}
@Override

View File

@ -34,14 +34,18 @@ public class Snapshot {
@SerializedName(value = "expired_at")
private long expiredAt = 0;
@SerializedName(value = "commitSeq")
private long commitSeq = 0;
public Snapshot() {
}
public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt) {
public Snapshot(String label, byte[] meta, byte[] jobInfo, long expiredAt, long commitSeq) {
this.label = label;
this.meta = meta;
this.jobInfo = jobInfo;
this.expiredAt = expiredAt;
this.commitSeq = commitSeq;
}
public byte[] getMeta() {
@ -60,6 +64,10 @@ public class Snapshot {
return System.currentTimeMillis() > expiredAt;
}
public long getCommitSeq() {
return commitSeq;
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
@ -71,6 +79,7 @@ public class Snapshot {
+ ", meta=" + meta
+ ", jobInfo=" + jobInfo
+ ", expiredAt=" + expiredAt
+ ", commitSeq=" + commitSeq
+ '}';
}
}

View File

@ -514,6 +514,10 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
}
}
public Set<String> getTableNames() {
return new HashSet<>(this.nameToTable.keySet());
}
/**
* This is a thread-safe method when nameToTable is a concurrent hash map
*/

View File

@ -46,6 +46,11 @@ public class BarrierLog implements Writable {
public BarrierLog() {
}
public BarrierLog(long dbId, String dbName) {
this.dbId = dbId;
this.dbName = dbName;
}
public BarrierLog(long dbId, String dbName, long tableId, String tableName) {
this.dbId = dbId;
this.dbName = dbName;

View File

@ -3034,9 +3034,10 @@ public class FrontendServiceImpl implements FrontendService.Iface {
byte[] meta = snapshot.getMeta();
byte[] jobInfo = snapshot.getJobInfo();
long expiredAt = snapshot.getExpiredAt();
long commitSeq = snapshot.getCommitSeq();
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, expired at: {}",
label, meta.length, jobInfo.length, expiredAt);
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, "
+ "expired at: {}, commit seq: {}", label, meta.length, jobInfo.length, expiredAt, commitSeq);
if (request.isEnableCompress()) {
meta = GZIPUtils.compress(meta);
jobInfo = GZIPUtils.compress(jobInfo);
@ -3049,6 +3050,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
result.setMeta(meta);
result.setJobInfo(jobInfo);
result.setExpiredAt(expiredAt);
result.setCommitSeq(commitSeq);
}
return result;

View File

@ -212,7 +212,7 @@ public class BackupJobTest {
new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, UnitTestUtil.TABLE_NAME),
null));
job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL,
env, repo.getId());
env, repo.getId(), 0);
}
@Test
@ -348,7 +348,7 @@ public class BackupJobTest {
new TableRef(new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, UnitTestUtil.DB_NAME, "unknown_tbl"),
null));
job = new BackupJob("label", dbId, UnitTestUtil.DB_NAME, tableRefs, 13600 * 1000, BackupStmt.BackupContent.ALL,
env, repo.getId());
env, repo.getId(), 0);
job.run();
Assert.assertEquals(Status.ErrCode.NOT_FOUND, job.getStatus().getErrCode());
Assert.assertEquals(BackupJobState.CANCELLED, job.getState());

View File

@ -1198,6 +1198,7 @@ struct TGetSnapshotResult {
4: optional Types.TNetworkAddress master_address
5: optional bool compressed;
6: optional i64 expiredAt; // in millis
7: optional i64 commit_seq;
}
struct TTableRef {