[Enhancement](binlog) Add create/drop table, add/drop paritition && alter job, modify columns binlog support (#21544)

This commit is contained in:
Jack Drogon
2023-07-09 09:11:56 +08:00
committed by GitHub
parent c36cd18a08
commit aacb9b9b66
24 changed files with 368 additions and 71 deletions

View File

@ -219,7 +219,7 @@ public class Alter {
((SchemaChangeHandler) schemaChangeHandler).updateBinlogConfig(db, olapTable, alterClauses);
} else if (currentAlterOps.hasSchemaChangeOp()) {
// if modify storage type to v2, do schema change to convert all related tablets to segment v2 format
schemaChangeHandler.process(alterClauses, clusterName, db, olapTable);
schemaChangeHandler.process(stmt.toSql(), alterClauses, clusterName, db, olapTable);
} else if (currentAlterOps.hasRollupOp()) {
materializedViewHandler.process(alterClauses, clusterName, db, olapTable);
} else if (currentAlterOps.hasPartitionOp()) {

View File

@ -173,9 +173,18 @@ public abstract class AlterHandler extends MasterDaemon {
/*
* entry function. handle alter ops
*/
public abstract void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable)
public abstract void process(String rawSql, List<AlterClause> alterClauses, String clusterName, Database db,
OlapTable olapTable)
throws UserException;
/*
* entry function. handle alter ops
*/
public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable)
throws UserException {
process("", alterClauses, clusterName, db, olapTable);
}
/*
* entry function. handle alter ops for external table
*/

View File

@ -86,8 +86,12 @@ public abstract class AlterJobV2 implements Writable {
protected long finishedTimeMs = -1;
@SerializedName(value = "timeoutMs")
protected long timeoutMs = -1;
@SerializedName(value = "rawSql")
protected String rawSql;
public AlterJobV2(long jobId, JobType jobType, long dbId, long tableId, String tableName, long timeoutMs) {
public AlterJobV2(String rawSql, long jobId, JobType jobType, long dbId, long tableId, String tableName,
long timeoutMs) {
this.rawSql = rawSql;
this.jobId = jobId;
this.type = jobType;
this.dbId = dbId;
@ -240,4 +244,8 @@ public abstract class AlterJobV2 implements Writable {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, AlterJobV2.class);
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
}

View File

@ -208,10 +208,11 @@ public class MaterializedViewHandler extends AlterHandler {
List<Column> mvColumns = checkAndPrepareMaterializedView(addMVClause, olapTable);
// Step2: create mv job
RollupJobV2 rollupJobV2 = createMaterializedViewJob(mvIndexName, baseIndexName, mvColumns,
addMVClause.getWhereClauseItemExpr(olapTable),
addMVClause.getProperties(), olapTable, db, baseIndexId,
addMVClause.getMVKeysType(), addMVClause.getOrigStmt());
RollupJobV2 rollupJobV2 =
createMaterializedViewJob(addMVClause.toSql(), mvIndexName, baseIndexName, mvColumns,
addMVClause.getWhereClauseItemExpr(olapTable),
addMVClause.getProperties(), olapTable, db, baseIndexId,
addMVClause.getMVKeysType(), addMVClause.getOrigStmt());
addAlterJobV2(rollupJobV2);
@ -236,7 +237,7 @@ public class MaterializedViewHandler extends AlterHandler {
* @throws DdlException
* @throws AnalysisException
*/
public void processBatchAddRollup(List<AlterClause> alterClauses, Database db, OlapTable olapTable)
public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses, Database db, OlapTable olapTable)
throws DdlException, AnalysisException {
checkReplicaCount(olapTable);
Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
@ -285,8 +286,10 @@ public class MaterializedViewHandler extends AlterHandler {
addRollupClause, olapTable, baseIndexId, changeStorageFormat);
// step 3 create rollup job
RollupJobV2 alterJobV2 = createMaterializedViewJob(rollupIndexName, baseIndexName, rollupSchema, null,
addRollupClause.getProperties(), olapTable, db, baseIndexId, olapTable.getKeysType(), null);
RollupJobV2 alterJobV2 =
createMaterializedViewJob(rawSql, rollupIndexName, baseIndexName, rollupSchema, null,
addRollupClause.getProperties(), olapTable, db, baseIndexId, olapTable.getKeysType(),
null);
rollupNameJobMap.put(addRollupClause.getRollupName(), alterJobV2);
logJobIdSet.add(alterJobV2.getJobId());
@ -335,7 +338,7 @@ public class MaterializedViewHandler extends AlterHandler {
* @throws DdlException
* @throws AnalysisException
*/
private RollupJobV2 createMaterializedViewJob(String mvName, String baseIndexName,
private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, String baseIndexName,
List<Column> mvColumns, Column whereColumn, Map<String, String> properties,
OlapTable olapTable, Database db, long baseIndexId, KeysType mvKeysType,
OriginStatement origStmt) throws DdlException, AnalysisException {
@ -364,7 +367,7 @@ public class MaterializedViewHandler extends AlterHandler {
IdGeneratorBuffer idGeneratorBuffer = env.getIdGeneratorBuffer(bufferSize);
long jobId = idGeneratorBuffer.getNextId();
long mvIndexId = idGeneratorBuffer.getNextId();
RollupJobV2 mvJob = new RollupJobV2(jobId, dbId, tableId, olapTable.getName(), timeoutMs,
RollupJobV2 mvJob = new RollupJobV2(rawSql, jobId, dbId, tableId, olapTable.getName(), timeoutMs,
baseIndexId, mvIndexId, baseIndexName, mvName,
mvColumns, whereColumn, baseSchemaHash, mvSchemaHash,
mvKeysType, mvShortKeyColumnCount, origStmt);
@ -1196,7 +1199,8 @@ public class MaterializedViewHandler extends AlterHandler {
}
@Override
public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable)
public void process(String rawSql, List<AlterClause> alterClauses, String clusterName, Database db,
OlapTable olapTable)
throws DdlException, AnalysisException, MetaNotFoundException {
if (olapTable.isDuplicateWithoutKey()) {
throw new DdlException("Duplicate table without keys do not support alter rollup!");
@ -1204,7 +1208,7 @@ public class MaterializedViewHandler extends AlterHandler {
Optional<AlterClause> alterClauseOptional = alterClauses.stream().findAny();
if (alterClauseOptional.isPresent()) {
if (alterClauseOptional.get() instanceof AddRollupClause) {
processBatchAddRollup(alterClauses, db, olapTable);
processBatchAddRollup(rawSql, alterClauses, db, olapTable);
} else if (alterClauseOptional.get() instanceof DropRollupClause) {
processBatchDropRollup(alterClauses, db, olapTable);
} else {

View File

@ -148,12 +148,14 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
super(JobType.ROLLUP);
}
public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId,
long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema,
Column whereColumn,
int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, short rollupShortKeyColumnCount,
OriginStatement origStmt) throws AnalysisException {
super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs);
public RollupJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs,
long baseIndexId,
long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema,
Column whereColumn,
int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType,
short rollupShortKeyColumnCount,
OriginStatement origStmt) throws AnalysisException {
super(rawSql, jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs);
this.baseIndexId = baseIndexId;
this.rollupIndexId = rollupIndexId;
@ -883,4 +885,9 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
}
setColumnsDefineExpr(stmt.getMVColumnItemList());
}
@Override
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
}

View File

@ -1171,7 +1171,7 @@ public class SchemaChangeHandler extends AlterHandler {
}
}
private void createJob(long dbId, OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap,
private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long, LinkedList<Column>> indexSchemaMap,
Map<String, String> propertyMap, List<Index> indexes) throws UserException {
checkReplicaCount(olapTable);
@ -1463,8 +1463,9 @@ public class SchemaChangeHandler extends AlterHandler {
long bufferSize = IdGeneratorUtil.getBufferSizeForAlterTable(olapTable, changedIndexIdToSchema.keySet());
IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
long jobId = idGeneratorBuffer.getNextId();
SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, dbId, olapTable.getId(), olapTable.getName(),
timeoutSecond * 1000);
SchemaChangeJobV2 schemaChangeJob =
new SchemaChangeJobV2(rawSql, jobId, dbId, olapTable.getId(), olapTable.getName(),
timeoutSecond * 1000);
schemaChangeJob.setBloomFilterInfo(hasBfChange, bfColumns, bfFpp);
schemaChangeJob.setAlterIndexInfo(hasIndexChange, indexes);
@ -1741,7 +1742,8 @@ public class SchemaChangeHandler extends AlterHandler {
}
@Override
public void process(List<AlterClause> alterClauses, String clusterName, Database db, OlapTable olapTable)
public void process(String rawSql, List<AlterClause> alterClauses, String clusterName, Database db,
OlapTable olapTable)
throws UserException {
olapTable.writeLockOrDdlException();
try {
@ -1985,18 +1987,18 @@ public class SchemaChangeHandler extends AlterHandler {
if (lightSchemaChange) {
long jobId = Env.getCurrentEnv().getNextId();
//for schema change add/drop value column optimize, direct modify table meta.
modifyTableLightSchemaChange(db, olapTable, indexSchemaMap, newIndexes,
modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes,
null, isDropIndex, jobId, false);
} else if (lightIndexChange) {
long jobId = Env.getCurrentEnv().getNextId();
//for schema change add/drop inverted index optimize, direct modify table meta firstly.
modifyTableLightSchemaChange(db, olapTable, indexSchemaMap, newIndexes,
modifyTableLightSchemaChange(rawSql, db, olapTable, indexSchemaMap, newIndexes,
alterIndexes, isDropIndex, jobId, false);
} else if (buildIndexChange) {
buildOrDeleteTableInvertedIndices(db, olapTable, indexSchemaMap,
alterIndexes, invertedIndexOnPartitions, false);
} else {
createJob(db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes);
createJob(rawSql, db.getId(), olapTable, indexSchemaMap, propertyMap, newIndexes);
}
} finally {
olapTable.writeUnlock();
@ -2459,7 +2461,7 @@ public class SchemaChangeHandler extends AlterHandler {
}
// the invoker should keep table's write lock
public void modifyTableLightSchemaChange(Database db, OlapTable olapTable,
public void modifyTableLightSchemaChange(String rawSql, Database db, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes,
List<Index> alterIndexes, boolean isDropIndex,
long jobId, boolean isReplay)
@ -2488,7 +2490,7 @@ public class SchemaChangeHandler extends AlterHandler {
//for compatibility, we need create a finished state schema change job v2
SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(jobId, db.getId(), olapTable.getId(),
SchemaChangeJobV2 schemaChangeJob = new SchemaChangeJobV2(rawSql, jobId, db.getId(), olapTable.getId(),
olapTable.getName(), 1000);
for (Map.Entry<Long, List<Column>> entry : changedIndexIdToSchema.entrySet()) {
@ -2517,9 +2519,8 @@ public class SchemaChangeHandler extends AlterHandler {
if (alterIndexes != null) {
if (!isReplay) {
TableAddOrDropInvertedIndicesInfo info = new TableAddOrDropInvertedIndicesInfo(
db.getId(), olapTable.getId(), indexSchemaMap, indexes,
alterIndexes, isDropIndex, jobId);
TableAddOrDropInvertedIndicesInfo info = new TableAddOrDropInvertedIndicesInfo(rawSql, db.getId(),
olapTable.getId(), indexSchemaMap, indexes, alterIndexes, isDropIndex, jobId);
LOG.debug("logModifyTableAddOrDropInvertedIndices info:{}", info);
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropInvertedIndices(info);
@ -2542,7 +2543,7 @@ public class SchemaChangeHandler extends AlterHandler {
olapTable.getName(), jobId, isReplay);
} else {
if (!isReplay) {
TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(db.getId(), olapTable.getId(),
TableAddOrDropColumnsInfo info = new TableAddOrDropColumnsInfo(rawSql, db.getId(), olapTable.getId(),
indexSchemaMap, indexes, jobId);
LOG.debug("logModifyTableAddOrDropColumns info:{}", info);
Env.getCurrentEnv().getEditLog().logModifyTableAddOrDropColumns(info);
@ -2564,7 +2565,7 @@ public class SchemaChangeHandler extends AlterHandler {
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
modifyTableLightSchemaChange(db, olapTable, indexSchemaMap, indexes, null, false, jobId, true);
modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, indexes, null, false, jobId, true);
} catch (DdlException e) {
// should not happen
LOG.warn("failed to replay modify table add or drop or modify columns", e);
@ -2695,7 +2696,7 @@ public class SchemaChangeHandler extends AlterHandler {
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
modifyTableLightSchemaChange(db, olapTable, indexSchemaMap, newIndexes,
modifyTableLightSchemaChange("", db, olapTable, indexSchemaMap, newIndexes,
alterIndexes, isDropIndex, jobId, true);
} catch (UserException e) {
// should not happen

View File

@ -140,8 +140,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
super(JobType.SCHEMA_CHANGE);
}
public SchemaChangeJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs) {
super(jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs);
public SchemaChangeJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs) {
super(rawSql, jobId, JobType.SCHEMA_CHANGE, dbId, tableId, tableName, timeoutMs);
}
public void addTabletIdMap(long partitionId, long shadowIdxId, long shadowTabletId, long originTabletId) {
@ -937,4 +937,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
String json = GsonUtils.GSON.toJson(this, AlterJobV2.class);
Text.writeString(out, json);
}
@Override
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
}

View File

@ -105,8 +105,9 @@ public class SystemHandler extends AlterHandler {
@Override
// add synchronized to avoid process 2 or more stmts at same time
public synchronized void process(List<AlterClause> alterClauses, String clusterName, Database dummyDb,
OlapTable dummyTbl) throws UserException {
public synchronized void process(String rawSql, List<AlterClause> alterClauses, String clusterName,
Database dummyDb,
OlapTable dummyTbl) throws UserException {
Preconditions.checkArgument(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);

View File

@ -141,4 +141,12 @@ public class AbstractBackupStmt extends DdlStmt {
public long getTimeoutMs() {
return timeoutMs;
}
public void setProperty(String key, String value) {
properties.put(key, value);
}
public void removeProperty(String key) {
properties.remove(key);
}
}

View File

@ -70,8 +70,8 @@ public class IndexDef {
this.properties = properties;
}
if (indexType == IndexType.NGRAM_BF) {
properties.putIfAbsent(NGRAM_SIZE_KEY, DEFAULT_NGRAM_SIZE);
properties.putIfAbsent(NGRAM_BF_SIZE_KEY, DEFAULT_NGRAM_BF_SIZE);
this.properties.putIfAbsent(NGRAM_SIZE_KEY, DEFAULT_NGRAM_SIZE);
this.properties.putIfAbsent(NGRAM_BF_SIZE_KEY, DEFAULT_NGRAM_BF_SIZE);
}
}
@ -122,23 +122,25 @@ public class IndexDef {
if (tableName != null && !tableName.isEmpty()) {
sb.append(" ON ").append(tableName);
}
sb.append(" (");
boolean first = true;
for (String col : columns) {
if (first) {
first = false;
} else {
sb.append(",");
if (columns != null && columns.size() > 0) {
sb.append(" (");
boolean first = true;
for (String col : columns) {
if (first) {
first = false;
} else {
sb.append(",");
}
sb.append("`" + col + "`");
}
sb.append("`" + col + "`");
sb.append(")");
}
sb.append(")");
if (indexType != null) {
sb.append(" USING ").append(indexType.toString());
}
if (properties != null && properties.size() > 0) {
sb.append(" PROPERTIES(");
first = true;
boolean first = true;
for (Map.Entry<String, String> e : properties.entrySet()) {
if (first) {
first = false;

View File

@ -98,6 +98,10 @@ public class RestoreStmt extends AbstractBackupStmt {
return jobInfo;
}
public void disableDynamicPartition() {
setProperty(PROP_RESERVE_DYNAMIC_PARTITION_ENABLE, "false");
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
if (repoName.equals(Repository.KEEP_ON_LOCAL_REPO_NAME)) {

View File

@ -17,12 +17,14 @@
package org.apache.doris.binlog;
import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
@ -129,6 +131,18 @@ public class BinlogManager {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
}
public void addCreateTableRecord(CreateTableRecord createTableRecord) {
long dbId = createTableRecord.getDbId();
List<Long> tableIds = new ArrayList<Long>();
tableIds.add(createTableRecord.getTableId());
long commitSeq = createTableRecord.getCommitSeq();
long timestamp = -1;
TBinlogType type = TBinlogType.CREATE_TABLE;
String data = createTableRecord.toJson();
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
}
public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) {
long dbId = dropPartitionInfo.getDbId();
List<Long> tableIds = new ArrayList<Long>();
@ -140,6 +154,40 @@ public class BinlogManager {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
}
public void addDropTableRecord(DropTableRecord record) {
long dbId = record.getDbId();
List<Long> tableIds = new ArrayList<Long>();
tableIds.add(record.getTableId());
long commitSeq = record.getCommitSeq();
long timestamp = -1;
TBinlogType type = TBinlogType.DROP_TABLE;
String data = record.toJson();
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
}
public void addAlterJobV2(AlterJobV2 alterJob, long commitSeq) {
long dbId = alterJob.getDbId();
List<Long> tableIds = new ArrayList<Long>();
tableIds.add(alterJob.getTableId());
long timestamp = -1;
TBinlogType type = TBinlogType.ALTER_JOB;
String data = alterJob.toJson();
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
}
public void addModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info, long commitSeq) {
long dbId = info.getDbId();
List<Long> tableIds = new ArrayList<Long>();
tableIds.add(info.getTableId());
long timestamp = -1;
TBinlogType type = TBinlogType.MODIFY_TABLE_ADD_OR_DROP_COLUMNS;
String data = info.toJson();
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
}
// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long prevCommitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
@ -329,10 +377,6 @@ public class BinlogManager {
}
public long read(DataInputStream dis, long checksum) throws IOException {
if (!Config.enable_feature_binlog) {
return checksum;
}
// Step 1: read binlogs length
int size = dis.readInt();
LOG.info("read binlogs length: {}", size);

View File

@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.binlog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
public class CreateTableRecord {
private static final Logger LOG = LogManager.getLogger(CreateTableRecord.class);
@SerializedName(value = "commitSeq")
private long commitSeq;
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "sql")
private String sql;
public CreateTableRecord(long commitSeq, CreateTableInfo info) {
Table table = info.getTable();
this.commitSeq = commitSeq;
this.tableId = table.getId();
String dbName = info.getDbName();
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
if (db == null) {
LOG.warn("db not found. dbId: {}", dbId);
this.dbId = -1L;
} else {
this.dbId = db.getId();
}
List<String> createTableStmt = Lists.newArrayList();
List<String> addPartitionStmt = Lists.newArrayList();
List<String> createRollupStmt = Lists.newArrayList();
table.readLock();
try {
Env.getDdlStmt(table, createTableStmt, addPartitionStmt, createRollupStmt, false, false /* show password */,
-1L);
} finally {
table.readUnlock();
}
if (createTableStmt.size() > 0) {
this.sql = createTableStmt.get(0);
} else {
this.sql = "";
}
}
public long getCommitSeq() {
return commitSeq;
}
public long getDbId() {
return dbId;
}
public long getTableId() {
return tableId;
}
public String getSql() {
return sql;
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
@Override
public String toString() {
return toJson();
}
}

View File

@ -83,6 +83,19 @@ public class DBBinlog {
return;
}
// HACK: for metadata fix
if (!binlog.isSetType()) {
return;
}
switch (binlog.getType()) {
case CREATE_TABLE:
return;
case DROP_TABLE:
return;
default:
break;
}
for (long tableId : tableIds) {
TableBinlog tableBinlog = tableBinlogMap.get(tableId);
if (tableBinlog == null) {

View File

@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.binlog;
import org.apache.doris.persist.DropInfo;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
public class DropTableRecord {
@SerializedName(value = "commitSeq")
private long commitSeq;
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
public DropTableRecord(long commitSeq, DropInfo info) {
this.commitSeq = commitSeq;
this.dbId = info.getDbId();
this.tableId = info.getTableId();
}
public long getCommitSeq() {
return commitSeq;
}
public long getDbId() {
return dbId;
}
public long getTableId() {
return tableId;
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
@Override
public String toString() {
return toJson();
}
}

View File

@ -149,15 +149,17 @@ public class TableBinlog {
Iterator<TBinlog> iter = binlogs.iterator();
while (iter.hasNext()) {
TBinlog binlog = iter.next();
if (binlog.getTimestamp() <= expireMs) {
if (binlog.getType() == TBinlogType.UPSERT) {
tombstoneUpsert = binlog;
}
largestExpiredCommitSeq = binlog.getCommitSeq();
iter.remove();
} else {
long timestamp = binlog.getTimestamp();
if (timestamp > expireMs) {
break;
}
if (binlog.getType() == TBinlogType.UPSERT) {
tombstoneUpsert = binlog;
}
largestExpiredCommitSeq = binlog.getCommitSeq();
iter.remove();
}
} finally {
lock.writeLock().unlock();

View File

@ -25,6 +25,8 @@ import org.apache.doris.backup.BackupJob;
import org.apache.doris.backup.Repository;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.binlog.AddPartitionRecord;
import org.apache.doris.binlog.CreateTableRecord;
import org.apache.doris.binlog.DropTableRecord;
import org.apache.doris.binlog.UpsertRecord;
import org.apache.doris.blockrule.SqlBlockRule;
import org.apache.doris.catalog.BrokerMgr;
@ -210,7 +212,9 @@ public class EditLog {
CreateTableInfo info = (CreateTableInfo) journal.getData();
LOG.info("Begin to unprotect create table. db = " + info.getDbName() + " table = " + info.getTable()
.getId());
CreateTableRecord record = new CreateTableRecord(logId, info);
env.replayCreateTable(info.getDbName(), info.getTable());
env.getBinlogManager().addCreateTableRecord(record);
break;
}
case OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA: {
@ -225,7 +229,9 @@ public class EditLog {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(info.getDbId());
LOG.info("Begin to unprotect drop table. db = " + db.getFullName() + " table = "
+ info.getTableId());
DropTableRecord record = new DropTableRecord(logId, info);
env.replayDropTable(db, info.getTableId(), info.isForceDrop(), info.getRecycleTime());
env.getBinlogManager().addDropTableRecord(record);
break;
}
case OperationType.OP_ADD_PARTITION: {
@ -715,6 +721,7 @@ public class EditLog {
default:
break;
}
env.getBinlogManager().addAlterJobV2(alterJob, logId);
break;
}
case OperationType.OP_UPDATE_COOLDOWN_CONF:
@ -868,6 +875,7 @@ public class EditLog {
case OperationType.OP_MODIFY_TABLE_LIGHT_SCHEMA_CHANGE: {
final TableAddOrDropColumnsInfo info = (TableAddOrDropColumnsInfo) journal.getData();
env.getSchemaChangeHandler().replayModifyTableLightSchemaChange(info);
env.getBinlogManager().addModifyTableAddOrDropColumns(info, logId);
break;
}
case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: {
@ -1198,7 +1206,9 @@ public class EditLog {
}
public void logCreateTable(CreateTableInfo info) {
logEdit(OperationType.OP_CREATE_TABLE, info);
long logId = logEdit(OperationType.OP_CREATE_TABLE, info);
CreateTableRecord record = new CreateTableRecord(logId, info);
Env.getCurrentEnv().getBinlogManager().addCreateTableRecord(record);
}
public void logRefreshExternalTableSchema(RefreshExternalTableInfo info) {
@ -1235,7 +1245,9 @@ public class EditLog {
}
public void logDropTable(DropInfo info) {
logEdit(OperationType.OP_DROP_TABLE, info);
long logId = logEdit(OperationType.OP_DROP_TABLE, info);
DropTableRecord record = new DropTableRecord(logId, info);
Env.getCurrentEnv().getBinlogManager().addDropTableRecord(record);
}
public void logEraseTable(long tableId) {
@ -1586,7 +1598,8 @@ public class EditLog {
}
public void logAlterJob(AlterJobV2 alterJob) {
logEdit(OperationType.OP_ALTER_JOB_V2, alterJob);
long logId = logEdit(OperationType.OP_ALTER_JOB_V2, alterJob);
Env.getCurrentEnv().getBinlogManager().addAlterJobV2(alterJob, logId);
}
public void logUpdateCooldownConf(CooldownConfList cooldownConf) {
@ -1766,7 +1779,8 @@ public class EditLog {
}
public void logModifyTableAddOrDropColumns(TableAddOrDropColumnsInfo info) {
logEdit(OperationType.OP_MODIFY_TABLE_LIGHT_SCHEMA_CHANGE, info);
long logId = logEdit(OperationType.OP_MODIFY_TABLE_LIGHT_SCHEMA_CHANGE, info);
Env.getCurrentEnv().getBinlogManager().addModifyTableAddOrDropColumns(info, logId);
}
public void logModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info) {

View File

@ -46,9 +46,12 @@ public class TableAddOrDropColumnsInfo implements Writable {
private List<Index> indexes;
@SerializedName(value = "jobId")
private long jobId;
@SerializedName(value = "rawSql")
private String rawSql;
public TableAddOrDropColumnsInfo(long dbId, long tableId,
public TableAddOrDropColumnsInfo(String rawSql, long dbId, long tableId,
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes, long jobId) {
this.rawSql = rawSql;
this.dbId = dbId;
this.tableId = tableId;
this.indexSchemaMap = indexSchemaMap;
@ -112,4 +115,8 @@ public class TableAddOrDropColumnsInfo implements Writable {
sb.append(" jobId: ").append(jobId);
return sb.toString();
}
public String toJson() {
return GsonUtils.GSON.toJson(this);
}
}

View File

@ -50,11 +50,14 @@ public class TableAddOrDropInvertedIndicesInfo implements Writable {
private boolean isDropInvertedIndex;
@SerializedName(value = "jobId")
private long jobId;
@SerializedName(value = "rawSql")
private String rawSql;
public TableAddOrDropInvertedIndicesInfo(long dbId, long tableId,
public TableAddOrDropInvertedIndicesInfo(String rawSql, long dbId, long tableId,
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes,
List<Index> alterInvertedIndexes, boolean isDropInvertedIndex,
long jobId) {
this.rawSql = rawSql;
this.dbId = dbId;
this.tableId = tableId;
this.indexSchemaMap = indexSchemaMap;

View File

@ -481,6 +481,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
List<Index> newIndexes = olapTable.getCopiedIndexes();
long jobId = Env.getCurrentEnv().getNextId();
Env.getCurrentEnv().getSchemaChangeHandler().modifyTableLightSchemaChange(
"",
db, olapTable, indexSchemaMap, newIndexes, null, false, jobId, false);
} else {
throw new MetaNotFoundException("table_id "
@ -2523,6 +2524,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
Map<String, String> properties = request.getProperties();
RestoreStmt restoreStmt = new RestoreStmt(label, repoName, null, properties, request.getMeta(),
request.getJobInfo());
restoreStmt.disableDynamicPartition();
LOG.trace("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
ConnectContext ctx = ConnectContext.get();

View File

@ -327,7 +327,7 @@ public class RollupJobV2Test {
Column column = new Column(mvColumnName, Type.BITMAP, false, AggregateType.BITMAP_UNION, false, "1", "");
columns.add(column);
RollupJobV2 rollupJobV2 = new RollupJobV2(1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, null, 1, 1,
RollupJobV2 rollupJobV2 = new RollupJobV2("", 1, 1, 1, "test", 1, 1, 1, "test", "rollup", columns, null, 1, 1,
KeysType.AGG_KEYS, keysCount,
new OriginStatement("create materialized view rollup as select bitmap_union(to_bitmap(c1)) from test",
0));

View File

@ -399,7 +399,7 @@ public class SchemaChangeJobV2Test {
file.deleteOnExit();
DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2(1, 1, 1, "test", 600000);
SchemaChangeJobV2 schemaChangeJobV2 = new SchemaChangeJobV2("", 1, 1, 1, "test", 600000);
schemaChangeJobV2.setStorageFormat(TStorageFormat.V2);
Deencapsulation.setField(schemaChangeJobV2, "jobState", AlterJobV2.JobState.FINISHED);
Map<Long, SchemaVersionAndHash> indexSchemaVersionAndHashMap = Maps.newHashMap();

View File

@ -67,7 +67,7 @@ public class TableAddOrDropColumnsInfoTest {
List<Index> indexes = Lists.newArrayList(
new Index(0, "index", Lists.newArrayList("testCol1"), IndexDef.IndexType.BITMAP, null, "xxxxxx"));
TableAddOrDropColumnsInfo tableAddOrDropColumnsInfo1 = new TableAddOrDropColumnsInfo(dbId, tableId,
TableAddOrDropColumnsInfo tableAddOrDropColumnsInfo1 = new TableAddOrDropColumnsInfo("", dbId, tableId,
indexSchemaMap, indexes, jobId);
String c1Json = GsonUtils.GSON.toJson(tableAddOrDropColumnsInfo1);