[Enhancement](binlog) Add Barrier log into BinlogManager (#22559)

Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
This commit is contained in:
Jack Drogon
2023-08-04 14:37:12 +08:00
committed by GitHub
parent 34b7f381b1
commit 34164f69ba
6 changed files with 89 additions and 8 deletions

View File

@ -36,6 +36,7 @@ import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.View;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
@ -376,7 +377,7 @@ public class BackupJob extends AbstractJob {
OlapTable olapTable = (OlapTable) tbl;
checkOlapTable(olapTable, tableRef);
if (getContent() == BackupContent.ALL) {
prepareSnapshotTaskForOlapTableWithoutLock((OlapTable) tbl, tableRef, batchTask);
prepareSnapshotTaskForOlapTableWithoutLock(db, (OlapTable) tbl, tableRef, batchTask);
}
prepareBackupMetaForOlapTableWithoutLock(tableRef, olapTable, copiedTables);
break;
@ -430,10 +431,15 @@ public class BackupJob extends AbstractJob {
}
}
private void prepareSnapshotTaskForOlapTableWithoutLock(OlapTable olapTable,
private void prepareSnapshotTaskForOlapTableWithoutLock(Database db, OlapTable olapTable,
TableRef backupTableRef, AgentBatchTask batchTask) {
// Add barrier editolog for barrier commit seq
long commitSeq = env.getEditLog().logBarrier();
long dbId = db.getId();
String dbName = db.getFullName();
long tableId = olapTable.getId();
String tableName = olapTable.getName();
BarrierLog barrierLog = new BarrierLog(dbId, dbName, tableId, tableName);
long commitSeq = env.getEditLog().logBarrier(barrierLog);
// format as "table:{tableId}"
String tableKey = String.format("%s%d", TABLE_COMMIT_SEQ_PREFIX, olapTable.getId());
properties.put(tableKey, String.valueOf(commitSeq));

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
@ -247,6 +248,27 @@ public class BinlogManager {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, true);
}
// add Barrier log
public void addBarrierLog(BarrierLog barrierLog, long commitSeq) {
if (barrierLog == null) {
return;
}
long dbId = barrierLog.getDbId();
long tableId = barrierLog.getTableId();
if (dbId == 0 || tableId == 0) {
return;
}
List<Long> tableIds = Lists.newArrayList();
tableIds.add(tableId);
long timestamp = -1;
TBinlogType type = TBinlogType.BARRIER;
String data = barrierLog.toJson();
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false);
}
// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);

View File

@ -853,7 +853,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_BARRIER: {
data = new BarrierLog();
data = BarrierLog.read(in);
isRead = true;
break;
}

View File

@ -19,16 +19,65 @@ package org.apache.doris.persist;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class BarrierLog implements Writable {
@SerializedName(value = "dbId")
long dbId = 0L;
@SerializedName(value = "dbName")
String dbName;
@SerializedName(value = "tableId")
long tableId = 0L;
@SerializedName(value = "tableName")
String tableName;
public BarrierLog() {
}
public BarrierLog(long dbId, String dbName, long tableId, String tableName) {
this.dbId = dbId;
this.dbName = dbName;
this.tableId = tableId;
this.tableName = tableName;
}
public long getDbId() {
return dbId;
}
public String getDbName() {
return dbName;
}
public long getTableId() {
return tableId;
}
public String getTableName() {
return tableName;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, "");
}
public static BarrierLog read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), BarrierLog.class);
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
@Override
public String toString() {
return toJson();
}
}

View File

@ -1077,8 +1077,8 @@ public class EditLog {
break;
}
case OperationType.OP_BARRIER: {
// the log only for barrier commit seq, not need to replay
LOG.info("replay barrier");
BarrierLog log = (BarrierLog) journal.getData();
env.getBinlogManager().addBarrierLog(log, logId);
break;
}
case OperationType.OP_UPDATE_AUTO_INCREMENT_ID: {
@ -1893,8 +1893,11 @@ public class EditLog {
return logEdit(OperationType.OP_GC_BINLOG, log);
}
public long logBarrier() {
return logEdit(OperationType.OP_BARRIER, new BarrierLog());
public long logBarrier(BarrierLog log) {
long logId = logEdit(OperationType.OP_BARRIER, log);
Env.getCurrentEnv().getBinlogManager().addBarrierLog(log, logId);
LOG.info("logId {}, barrier {}", logId, log);
return logId;
}
public void logUpdateAutoIncrementId(AutoIncrementIdUpdateLog log) {