[improvement](backup) Add BackupJobInfo with tableCommitSeqMap (#21255)

Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
This commit is contained in:
Jack Drogon
2023-06-28 11:10:12 +08:00
committed by GitHub
parent e9bbac71dc
commit 08fe22cb0c
7 changed files with 82 additions and 8 deletions

View File

@ -75,6 +75,7 @@ import java.util.stream.Collectors;
public class BackupJob extends AbstractJob {
private static final Logger LOG = LogManager.getLogger(BackupJob.class);
private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:";
public enum BackupJobState {
PENDING, // Job is newly created. Send snapshot tasks and save copied meta info, then transfer to SNAPSHOTING
@ -110,7 +111,7 @@ public class BackupJob extends AbstractJob {
// save the local file path of meta info and job info file
private String localMetaInfoFilePath = null;
private String localJobInfoFilePath = null;
// backup properties
// backup properties && table commit seq with table id
private Map<String, String> properties = Maps.newHashMap();
private byte[] metaInfoBytes = null;
@ -431,6 +432,12 @@ public class BackupJob extends AbstractJob {
private void prepareSnapshotTaskForOlapTableWithoutLock(OlapTable olapTable,
TableRef backupTableRef, AgentBatchTask batchTask) {
// Add barrier editolog for barrier commit seq
long commitSeq = env.getEditLog().logBarrier();
// format as "table:{tableId}"
String tableKey = String.format("%s%d", TABLE_COMMIT_SEQ_PREFIX, olapTable.getId());
properties.put(tableKey, String.valueOf(commitSeq));
// check backup table again
if (backupTableRef.getPartitionNames() != null) {
for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
@ -680,8 +687,20 @@ public class BackupJob extends AbstractJob {
metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
// 3. save job info file
Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
// iterate properties, convert key, value from string to long
// key is "${TABLE_COMMIT_SEQ_PREFIX}{tableId}", only need tableId to long
for (Map.Entry<String, String> entry : properties.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (key.startsWith(TABLE_COMMIT_SEQ_PREFIX)) {
long tableId = Long.parseLong(key.substring(TABLE_COMMIT_SEQ_PREFIX.length()));
long commitSeq = Long.parseLong(value);
tableCommitSeqMap.put(tableId, commitSeq);
}
}
jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId,
getContent(), backupMeta, snapshotInfos);
getContent(), backupMeta, snapshotInfos, tableCommitSeqMap);
LOG.debug("job info: {}. {}", jobInfo, this);
File jobInfoFile = new File(jobDir, Repository.PREFIX_JOB_INFO + createTimeStr);
if (!jobInfoFile.createNewFile()) {

View File

@ -99,6 +99,9 @@ public class BackupJobInfo implements Writable {
@SerializedName("tablet_snapshot_path_map")
public Map<Long, String> tabletSnapshotPathMap = Maps.newHashMap();
@SerializedName("table_commit_seq_map")
public Map<Long, Long> tableCommitSeqMap;
public static class ExtraInfo {
public static class NetworkAddrss {
@SerializedName("ip")
@ -575,7 +578,7 @@ public class BackupJobInfo implements Writable {
public static BackupJobInfo fromCatalog(long backupTime, String label, String dbName, long dbId,
BackupContent content, BackupMeta backupMeta,
Map<Long, SnapshotInfo> snapshotInfos) {
Map<Long, SnapshotInfo> snapshotInfos, Map<Long, Long> tableCommitSeqMap) {
BackupJobInfo jobInfo = new BackupJobInfo();
jobInfo.backupTime = backupTime;
@ -584,6 +587,7 @@ public class BackupJobInfo implements Writable {
jobInfo.dbId = dbId;
jobInfo.metaVersion = FeConstants.meta_version;
jobInfo.content = content;
jobInfo.tableCommitSeqMap = tableCommitSeqMap;
Collection<Table> tbls = backupMeta.getTables().values();
// tbls

View File

@ -69,6 +69,7 @@ import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.AnalyzeDeletionLog;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
@ -829,6 +830,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_BARRIER: {
data = new BarrierLog();
isRead = true;
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);

View File

@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import java.io.DataOutput;
import java.io.IOException;
public class BarrierLog implements Writable {
public BarrierLog() {
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, "");
}
}

View File

@ -1037,6 +1037,11 @@ public class EditLog {
env.replayGcBinlog(binlogGcInfo);
break;
}
case OperationType.OP_BARRIER: {
// the log only for barrier commit seq, not need to replay
LOG.info("replay barrier");
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@ -1800,11 +1805,15 @@ public class EditLog {
logEdit(OperationType.OP_DELETE_ANALYSIS_TASK, log);
}
public void logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
public long logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
return logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
}
public void logGcBinlog(BinlogGcInfo log) {
logEdit(OperationType.OP_GC_BINLOG, log);
public long logGcBinlog(BinlogGcInfo log) {
return logEdit(OperationType.OP_GC_BINLOG, log);
}
public long logBarrier() {
return logEdit(OperationType.OP_BARRIER, new BarrierLog());
}
}

View File

@ -308,6 +308,8 @@ public class OperationType {
public static final short OP_GC_BINLOG = 435;
public static final short OP_BARRIER = 436;
/**
* Get opcode name by op code.

View File

@ -228,7 +228,7 @@ public class BackupHandlerTest {
BackupJobInfo info = BackupJobInfo.fromCatalog(System.currentTimeMillis(),
"ss2", CatalogMocker.TEST_DB_NAME,
CatalogMocker.TEST_DB_ID, BackupStmt.BackupContent.ALL,
backupMeta, snapshotInfos);
backupMeta, snapshotInfos, null);
infos.add(info);
return Status.OK;
}