[fix](editLog) add sufficient replay logic and edit log for altering light schema change (#18746)
This commit is contained in:
@ -28,7 +28,7 @@ import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
|
||||
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
|
||||
import org.apache.doris.proto.InternalService.PFetchColIdsRequest;
|
||||
import org.apache.doris.proto.InternalService.PFetchColIdsRequest.Builder;
|
||||
import org.apache.doris.proto.InternalService.PFetchColIdsRequest.PFetchColIdParam;
|
||||
@ -42,6 +42,8 @@ import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
@ -59,13 +61,15 @@ import java.util.concurrent.TimeoutException;
|
||||
/**
|
||||
* For alter light_schema_change table property
|
||||
*/
|
||||
public class AlterLSCHelper {
|
||||
public class AlterLightSchChangeHelper {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AlterLightSchChangeHelper.class);
|
||||
|
||||
private final Database db;
|
||||
|
||||
private final OlapTable olapTable;
|
||||
|
||||
public AlterLSCHelper(Database db, OlapTable olapTable) {
|
||||
public AlterLightSchChangeHelper(Database db, OlapTable olapTable) {
|
||||
this.db = db;
|
||||
this.olapTable = olapTable;
|
||||
}
|
||||
@ -77,9 +81,10 @@ public class AlterLSCHelper {
|
||||
*/
|
||||
public void enableLightSchemaChange() throws DdlException {
|
||||
final Map<Long, PFetchColIdsRequest> params = initParams();
|
||||
final PFetchColIdsResponse response = callForColumnIds(params);
|
||||
updateTableMeta(response);
|
||||
modifyTableProperty();
|
||||
final AlterLightSchemaChangeInfo info = callForColumnsInfo(params);
|
||||
updateTableMeta(info);
|
||||
Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info);
|
||||
LOG.info("successfully enable `light_schema_change`");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -133,10 +138,12 @@ public class AlterLSCHelper {
|
||||
|
||||
/**
|
||||
* @param beIdToRequest rpc param for corresponding BEs
|
||||
* @return indexIds to each tablet schema info which consists of columnName to corresponding column unique id pairs
|
||||
* @return contains indexIds to each tablet schema info which consists of columnName to corresponding
|
||||
* column unique id pairs
|
||||
* @throws DdlException as a wrapper for rpc failures
|
||||
*/
|
||||
private PFetchColIdsResponse callForColumnIds(Map<Long, PFetchColIdsRequest> beIdToRequest) throws DdlException {
|
||||
private AlterLightSchemaChangeInfo callForColumnsInfo(Map<Long, PFetchColIdsRequest> beIdToRequest)
|
||||
throws DdlException {
|
||||
final List<Future<PFetchColIdsResponse>> futureList = new ArrayList<>();
|
||||
// start a rpc in a pipeline way
|
||||
try {
|
||||
@ -173,14 +180,14 @@ public class AlterLSCHelper {
|
||||
} catch (TimeoutException e) {
|
||||
throw new DdlException("fetch columnIds RPC result timeout", e);
|
||||
}
|
||||
return compactToUniqResp(resultList);
|
||||
return compactToAlterLscInfo(resultList);
|
||||
}
|
||||
|
||||
/**
|
||||
* Since the result collected from several BEs may contain repeated indexes in distributed storage scenarios,
|
||||
* we should do consistency check for the result for the same index, and get the unique result.
|
||||
*/
|
||||
private PFetchColIdsResponse compactToUniqResp(List<PFetchColIdsResponse> resultList) {
|
||||
private AlterLightSchemaChangeInfo compactToAlterLscInfo(List<PFetchColIdsResponse> resultList) {
|
||||
final PFetchColIdsResponse.Builder builder = PFetchColIdsResponse.newBuilder();
|
||||
Map<Long, Map<String, Integer>> indexIdToTabletInfo = new HashMap<>();
|
||||
resultList.forEach(response -> {
|
||||
@ -197,27 +204,25 @@ public class AlterLSCHelper {
|
||||
"index: " + indexId + "got inconsistent schema in storage");
|
||||
}
|
||||
});
|
||||
return builder.build();
|
||||
return new AlterLightSchemaChangeInfo(db.getId(), olapTable.getId(), indexIdToTabletInfo);
|
||||
}
|
||||
|
||||
private void updateTableMeta(PFetchColIdsResponse response) throws DdlException {
|
||||
Preconditions.checkState(response.isInitialized());
|
||||
public void updateTableMeta(AlterLightSchemaChangeInfo info) throws DdlException {
|
||||
Preconditions.checkNotNull(info, "passed in info should be not null");
|
||||
// update index-meta once and for all
|
||||
// schema pair: <maxColId, columns>
|
||||
final List<Pair<Integer, List<Column>>> schemaPairs = new ArrayList<>();
|
||||
final List<Long> indexIds = new ArrayList<>();
|
||||
response.getEntriesList().forEach(entry -> {
|
||||
final long indexId = entry.getIndexId();
|
||||
info.getIndexIdToColumnInfo().forEach((indexId, colNameToId) -> {
|
||||
final List<Column> columns = olapTable.getSchemaByIndexId(indexId, true);
|
||||
final Map<String, Integer> colNameToId = entry.getColNameToIdMap();
|
||||
Preconditions.checkState(columns.size() == colNameToId.size(),
|
||||
"size mismatch for columns meta from BE");
|
||||
"size mismatch for original columns meta and that in change info");
|
||||
int maxColId = Column.COLUMN_UNIQUE_ID_INIT_VALUE;
|
||||
final List<Column> newSchema = new ArrayList<>();
|
||||
for (Column column : columns) {
|
||||
final String columnName = column.getName();
|
||||
final int columnId = Preconditions.checkNotNull(colNameToId.get(columnName),
|
||||
"failed to fetch column id of column:{" + columnName + "} from BE");
|
||||
"failed to fetch column id of column:{" + columnName + "}");
|
||||
final Column newColumn = new Column(column);
|
||||
newColumn.setUniqueId(columnId);
|
||||
newSchema.add(newColumn);
|
||||
@ -226,7 +231,8 @@ public class AlterLSCHelper {
|
||||
schemaPairs.add(Pair.of(maxColId, newSchema));
|
||||
indexIds.add(indexId);
|
||||
});
|
||||
Preconditions.checkState(schemaPairs.size() == indexIds.size());
|
||||
Preconditions.checkState(schemaPairs.size() == indexIds.size(),
|
||||
"impossible state, size of schemaPairs and indexIds should be the same");
|
||||
// update index-meta once and for all
|
||||
try {
|
||||
for (int i = 0; i < indexIds.size(); i++) {
|
||||
@ -238,14 +244,8 @@ public class AlterLSCHelper {
|
||||
} catch (IOException e) {
|
||||
throw new DdlException("fail to reset index schema", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void modifyTableProperty() {
|
||||
// write table property
|
||||
olapTable.setEnableLightSchemaChange(true);
|
||||
//write edit log
|
||||
ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), olapTable.getId(),
|
||||
olapTable.getTableProperty().getProperties());
|
||||
Env.getCurrentEnv().getEditLog().logAlterLightSchemaChange(info);
|
||||
LOG.info("successfully update table meta for `light_schema_change`");
|
||||
}
|
||||
}
|
||||
@ -73,6 +73,7 @@ import org.apache.doris.common.util.ListComparator;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
|
||||
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
|
||||
import org.apache.doris.persist.TableAddOrDropColumnsInfo;
|
||||
import org.apache.doris.persist.TableAddOrDropInvertedIndicesInfo;
|
||||
@ -1890,8 +1891,22 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
|
||||
|
||||
private void enableLightSchemaChange(Database db, OlapTable olapTable) throws DdlException {
|
||||
final AlterLSCHelper alterLSCHelper = new AlterLSCHelper(db, olapTable);
|
||||
alterLSCHelper.enableLightSchemaChange();
|
||||
final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable);
|
||||
alterLightSchChangeHelper.enableLightSchemaChange();
|
||||
}
|
||||
|
||||
public void replayAlterLightSchChange(AlterLightSchemaChangeInfo info) throws MetaNotFoundException {
|
||||
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrMetaException(info.getDbId());
|
||||
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
|
||||
olapTable.writeLock();
|
||||
final AlterLightSchChangeHelper alterLightSchChangeHelper = new AlterLightSchChangeHelper(db, olapTable);
|
||||
try {
|
||||
alterLightSchChangeHelper.updateTableMeta(info);
|
||||
} catch (DdlException e) {
|
||||
LOG.warn("failed to replay alter light schema change", e);
|
||||
} finally {
|
||||
olapTable.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -107,9 +107,6 @@ public class TableProperty implements Writable {
|
||||
buildInMemory();
|
||||
buildStoragePolicy();
|
||||
break;
|
||||
case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE:
|
||||
buildEnableLightSchemaChange();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
@ -61,6 +61,7 @@ import org.apache.doris.mtmv.metadata.DropMTMVTask;
|
||||
import org.apache.doris.mtmv.metadata.MTMVJob;
|
||||
import org.apache.doris.mtmv.metadata.MTMVTask;
|
||||
import org.apache.doris.mysql.privilege.UserPropertyInfo;
|
||||
import org.apache.doris.persist.AlterLightSchemaChangeInfo;
|
||||
import org.apache.doris.persist.AlterMultiMaterializedView;
|
||||
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
|
||||
import org.apache.doris.persist.AlterUserOperationLog;
|
||||
@ -624,7 +625,6 @@ public class JournalEntity implements Writable {
|
||||
}
|
||||
case OperationType.OP_DYNAMIC_PARTITION:
|
||||
case OperationType.OP_MODIFY_IN_MEMORY:
|
||||
case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE:
|
||||
case OperationType.OP_MODIFY_REPLICATION_NUM: {
|
||||
data = ModifyTablePropertyOperationLog.read(in);
|
||||
isRead = true;
|
||||
@ -807,6 +807,11 @@ public class JournalEntity implements Writable {
|
||||
isRead = true;
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: {
|
||||
data = AlterLightSchemaChangeInfo.read(in);
|
||||
isRead = true;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
IOException e = new IOException();
|
||||
LOG.error("UNKNOWN Operation Type {}", opCode, e);
|
||||
|
||||
@ -0,0 +1,68 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.persist;
|
||||
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class AlterLightSchemaChangeInfo implements Writable {
|
||||
|
||||
@SerializedName(value = "dbId")
|
||||
private Long dbId;
|
||||
|
||||
@SerializedName(value = "tableId")
|
||||
private Long tableId;
|
||||
|
||||
@SerializedName("indexIdToColumnInfo")
|
||||
private Map<Long, Map<String, Integer>> indexIdToColumnInfo;
|
||||
|
||||
public AlterLightSchemaChangeInfo(long dbId, long tableId, Map<Long, Map<String, Integer>> info) {
|
||||
this.dbId = dbId;
|
||||
this.tableId = tableId;
|
||||
this.indexIdToColumnInfo = info;
|
||||
}
|
||||
|
||||
public Long getDbId() {
|
||||
return dbId;
|
||||
}
|
||||
|
||||
public Long getTableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
public Map<Long, Map<String, Integer>> getIndexIdToColumnInfo() {
|
||||
return indexIdToColumnInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, GsonUtils.GSON.toJson(this));
|
||||
}
|
||||
|
||||
public static AlterLightSchemaChangeInfo read(DataInput in) throws IOException {
|
||||
return GsonUtils.GSON.fromJson(Text.readString(in), AlterLightSchemaChangeInfo.class);
|
||||
}
|
||||
}
|
||||
@ -765,7 +765,6 @@ public class EditLog {
|
||||
}
|
||||
case OperationType.OP_DYNAMIC_PARTITION:
|
||||
case OperationType.OP_MODIFY_IN_MEMORY:
|
||||
case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE:
|
||||
case OperationType.OP_MODIFY_REPLICATION_NUM: {
|
||||
ModifyTablePropertyOperationLog log = (ModifyTablePropertyOperationLog) journal.getData();
|
||||
env.replayModifyTableProperty(opCode, log);
|
||||
@ -897,6 +896,11 @@ public class EditLog {
|
||||
env.getSchemaChangeHandler().replayModifyTableLightSchemaChange(info);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE: {
|
||||
final AlterLightSchemaChangeInfo info = (AlterLightSchemaChangeInfo) journal.getData();
|
||||
env.getSchemaChangeHandler().replayAlterLightSchChange(info);
|
||||
break;
|
||||
}
|
||||
case OperationType.OP_MODIFY_TABLE_ADD_OR_DROP_INVERTED_INDICES: {
|
||||
final TableAddOrDropInvertedIndicesInfo info =
|
||||
(TableAddOrDropInvertedIndicesInfo) journal.getData();
|
||||
@ -1606,7 +1610,7 @@ public class EditLog {
|
||||
logEdit(OperationType.OP_MODIFY_IN_MEMORY, info);
|
||||
}
|
||||
|
||||
public void logAlterLightSchemaChange(ModifyTablePropertyOperationLog info) {
|
||||
public void logAlterLightSchemaChange(AlterLightSchemaChangeInfo info) {
|
||||
logEdit(OperationType.OP_ALTER_LIGHT_SCHEMA_CHANGE, info);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user