cherry pick from #44418
This commit is contained in:
@ -469,6 +469,10 @@ public class IndexChangeJob implements Writable {
|
||||
LOG.info("cancel index job {}, err: {}", jobId, errMsg);
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return GsonUtils.GSON.toJson(this);
|
||||
}
|
||||
|
||||
public static IndexChangeJob read(DataInput in) throws IOException {
|
||||
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_122) {
|
||||
IndexChangeJob job = new IndexChangeJob();
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.binlog;
|
||||
|
||||
import org.apache.doris.alter.AlterJobV2;
|
||||
import org.apache.doris.alter.IndexChangeJob;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
@ -35,6 +36,7 @@ import org.apache.doris.persist.ModifyTablePropertyOperationLog;
|
||||
import org.apache.doris.persist.ReplacePartitionOperationLog;
|
||||
import org.apache.doris.persist.ReplaceTableOperationLog;
|
||||
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
|
||||
import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
|
||||
import org.apache.doris.persist.TableInfo;
|
||||
import org.apache.doris.persist.TableRenameColumnInfo;
|
||||
import org.apache.doris.persist.TruncateTableInfo;
|
||||
@ -377,6 +379,28 @@ public class BinlogManager {
|
||||
addBarrierLog(log, commitSeq);
|
||||
}
|
||||
|
||||
public void addModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info, long commitSeq) {
|
||||
long dbId = info.getDbId();
|
||||
List<Long> tableIds = Lists.newArrayList();
|
||||
tableIds.add(info.getTableId());
|
||||
long timestamp = -1;
|
||||
TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES;
|
||||
String data = info.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, info);
|
||||
}
|
||||
|
||||
public void addIndexChangeJob(IndexChangeJob indexChangeJob, long commitSeq) {
|
||||
long dbId = indexChangeJob.getDbId();
|
||||
List<Long> tableIds = Lists.newArrayList();
|
||||
tableIds.add(indexChangeJob.getTableId());
|
||||
long timestamp = -1;
|
||||
TBinlogType type = TBinlogType.INDEX_CHANGE_JOB;
|
||||
String data = indexChangeJob.toJson();
|
||||
|
||||
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data, false, indexChangeJob);
|
||||
}
|
||||
|
||||
// get binlog by dbId, return first binlog.version > version
|
||||
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
|
||||
TStatus status = new TStatus(TStatusCode.OK);
|
||||
|
||||
@ -977,11 +977,13 @@ public class EditLog {
|
||||
final TableAddOrDropInvertedIndicesInfo info =
|
||||
(TableAddOrDropInvertedIndicesInfo) journal.getData();
|
||||
env.getSchemaChangeHandler().replayModifyTableAddOrDropInvertedIndices(info);
|
||||
env.getBinlogManager().addModifyTableAddOrDropInvertedIndices(info, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_INVERTED_INDEX_JOB: {
|
||||
IndexChangeJob indexChangeJob = (IndexChangeJob) journal.getData();
|
||||
env.getSchemaChangeHandler().replayIndexChangeJob(indexChangeJob);
|
||||
env.getBinlogManager().addIndexChangeJob(indexChangeJob, logId);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_CLEAN_LABEL: {
|
||||
@ -1997,11 +1999,17 @@ public class EditLog {
|
||||
}
|
||||
|
||||
public void logModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info) {
|
||||
logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES, info);
|
||||
long logId = logEdit(OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES, info);
|
||||
LOG.info("walter log modify table add or drop inverted indices, infos: {}, json: {}",
|
||||
info, info.toJson(), new RuntimeException("test"));
|
||||
Env.getCurrentEnv().getBinlogManager().addModifyTableAddOrDropInvertedIndices(info, logId);
|
||||
}
|
||||
|
||||
public void logIndexChangeJob(IndexChangeJob indexChangeJob) {
|
||||
logEdit(OperationType.OP_INVERTED_INDEX_JOB, indexChangeJob);
|
||||
long logId = logEdit(OperationType.OP_INVERTED_INDEX_JOB, indexChangeJob);
|
||||
LOG.info("walter log inverted index job, infos: {}, json: {}",
|
||||
indexChangeJob, indexChangeJob.toJson(), new RuntimeException("test"));
|
||||
Env.getCurrentEnv().getBinlogManager().addIndexChangeJob(indexChangeJob, logId);
|
||||
}
|
||||
|
||||
public void logCleanLabel(CleanLabelOperationLog log) {
|
||||
|
||||
@ -95,6 +95,10 @@ public class TableAddOrDropInvertedIndicesInfo implements Writable {
|
||||
return jobId;
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return GsonUtils.GSON.toJson(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, GsonUtils.GSON.toJson(this));
|
||||
|
||||
@ -1142,6 +1142,8 @@ enum TBinlogType {
|
||||
MODIFY_COMMENT = 16,
|
||||
MODIFY_VIEW_DEF = 17,
|
||||
REPLACE_TABLE = 18,
|
||||
MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES = 19,
|
||||
INDEX_CHANGE_JOB = 20,
|
||||
|
||||
// Keep some IDs for allocation so that when new binlog types are added in the
|
||||
// future, the changes can be picked back to the old versions without breaking
|
||||
@ -1158,9 +1160,7 @@ enum TBinlogType {
|
||||
// MODIFY_XXX = 17,
|
||||
// MIN_UNKNOWN = 18,
|
||||
// UNKNOWN_3 = 19,
|
||||
MIN_UNKNOWN = 19,
|
||||
UNKNOWN_4 = 20,
|
||||
UNKNOWN_5 = 21,
|
||||
MIN_UNKNOWN = 21,
|
||||
UNKNOWN_6 = 22,
|
||||
UNKNOWN_7 = 23,
|
||||
UNKNOWN_8 = 24,
|
||||
|
||||
Reference in New Issue
Block a user