[feature-wip](auto-inc)(step-2) support auto-increment column for duplicate table (#19917)

This commit is contained in:
bobhan1
2023-07-20 18:03:39 +08:00
committed by GitHub
parent c31e826756
commit 367ad9164a
43 changed files with 11203 additions and 16 deletions

View File

@ -383,6 +383,7 @@ public class NativeInsertStmt extends InsertStmt {
slotDesc.setType(col.getType());
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
slotDesc.setAutoInc(col.isAutoInc());
}
} else if (targetTable instanceof MysqlTable || targetTable instanceof OdbcTable
|| targetTable instanceof JdbcTable) {

View File

@ -68,6 +68,7 @@ public class SlotDescriptor {
// If set to false, then such slots will be ignored during
// materialize them.Used to optimize to read less data and less memory usage
private boolean needMaterialize = true;
private boolean isAutoInc = false;
public SlotDescriptor(SlotId id, TupleDescriptor parent) {
this.id = id;
@ -154,6 +155,14 @@ public class SlotDescriptor {
isMaterialized = value;
}
public boolean isAutoInc() {
return isAutoInc;
}
public void setAutoInc(boolean isAutoInc) {
this.isAutoInc = isAutoInc;
}
public void materializeSrcExpr() {
if (sourceExprs == null) {
return;
@ -289,6 +298,7 @@ public class SlotDescriptor {
byteOffset, 0, getIsNullable() ? 0 : -1, ((column != null) ? column.getName() : ""), slotIdx,
isMaterialized);
tSlotDescriptor.setNeedMaterialize(needMaterialize);
tSlotDescriptor.setIsAutoIncrement(isAutoInc);
if (column != null) {
LOG.debug("column name:{}, column unique id:{}", column.getName(), column.getUniqueId());
tSlotDescriptor.setColUniqueId(column.getUniqueId());
@ -303,7 +313,8 @@ public class SlotDescriptor {
String parentTupleId = (parent == null) ? "null" : parent.getId().toString();
return MoreObjects.toStringHelper(this).add("id", id.asInt()).add("parent", parentTupleId).add("col", colStr)
.add("type", typeStr).add("materialized", isMaterialized).add("byteSize", byteSize)
.add("byteOffset", byteOffset).add("slotIdx", slotIdx).add("nullable", getIsNullable()).toString();
.add("byteOffset", byteOffset).add("slotIdx", slotIdx).add("nullable", getIsNullable())
.add("isAutoIncrement", isAutoInc).toString();
}
@Override
@ -319,6 +330,7 @@ public class SlotDescriptor {
.append(", colUniqueId=").append(column == null ? "null" : column.getUniqueId())
.append(", type=").append(type == null ? "null" : type.toSql())
.append(", nullable=").append(isNullable)
.append(", isAutoIncrement=").append(isAutoInc)
.append("}")
.toString();
}

View File

@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class AutoIncrementGenerator implements Writable {
private static final Logger LOG = LogManager.getLogger(AutoIncrementGenerator.class);
public static final long NEXT_ID_INIT_VALUE = 1;
// _MIN_BATCH_SIZE = 4064 in load task
private static final long BATCH_ID_INTERVAL = 50000;
@SerializedName(value = "dbId")
private Long dbId;
@SerializedName(value = "tableId")
private Long tableId;
@SerializedName(value = "columnId")
private Long columnId;
@SerializedName(value = "nextId")
private long nextId;
@SerializedName(value = "batchEndId")
private long batchEndId;
private EditLog editLog;
public AutoIncrementGenerator() {
}
public AutoIncrementGenerator(long dbId, long tableId, long columnId) {
this.dbId = dbId;
this.tableId = tableId;
this.columnId = columnId;
}
public void setEditLog(EditLog editLog) {
this.editLog = editLog;
}
public synchronized void applyChange(long columnId, long batchNextId) {
if (this.columnId == columnId && batchEndId < batchNextId) {
nextId = batchNextId;
batchEndId = batchNextId;
}
}
public synchronized Pair<Long, Long> getAutoIncrementRange(long columnId,
long length, long lowerBound) throws UserException {
LOG.info("[getAutoIncrementRange request][col:{}][length:{}], [{}]", columnId, length, this.columnId);
if (this.columnId != columnId) {
throw new UserException("column dosen't exist, columnId=" + columnId);
}
long startId = Math.max(nextId, lowerBound);
long endId = startId + length;
nextId = startId + length;
if (endId > batchEndId) {
batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
Preconditions.checkState(editLog != null);
AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId, tableId, columnId, batchEndId);
editLog.logUpdateAutoIncrementId(info);
}
LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length);
return Pair.of(startId, length);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static AutoIncrementGenerator read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), AutoIncrementGenerator.class);
}
}

View File

@ -508,6 +508,7 @@ public class Column implements Writable, GsonPostProcessable {
tColumn.setIsKey(this.isKey);
tColumn.setIsAllowNull(this.isAllowNull);
tColumn.setIsAutoIncrement(this.isAutoInc);
// keep compatibility
tColumn.setDefaultValue(this.realDefaultValue == null ? this.defaultValue : this.realDefaultValue);
tColumn.setVisible(visible);

View File

@ -171,6 +171,7 @@ import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.AlterMultiMaterializedView;
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.BinlogGcInfo;
@ -5367,4 +5368,8 @@ public class Env {
queryStats.clear(info);
editLog.logCleanQueryStats(info);
}
public void replayAutoIncrementIdUpdateLog(AutoIncrementIdUpdateLog log) throws Exception {
getInternalCatalog().replayAutoIncrementIdUpdateLog(log);
}
}

View File

@ -40,6 +40,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.DeepCopy;
@ -151,6 +152,8 @@ public class OlapTable extends Table {
private TableProperty tableProperty;
private AutoIncrementGenerator autoIncrementGenerator;
public OlapTable() {
// for persist
super(TableType.OLAP);
@ -1265,6 +1268,14 @@ public class OlapTable extends Table {
tableProperty.write(out);
}
// autoIncrementGenerator
if (autoIncrementGenerator == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
autoIncrementGenerator.write(out);
}
tempPartitions.write(out);
}
@ -1345,6 +1356,15 @@ public class OlapTable extends Table {
if (in.readBoolean()) {
tableProperty = TableProperty.read(in);
}
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_124) {
// autoIncrementGenerator
if (in.readBoolean()) {
autoIncrementGenerator = AutoIncrementGenerator.read(in);
autoIncrementGenerator.setEditLog(Env.getCurrentEnv().getEditLog());
}
}
if (isAutoBucket()) {
defaultDistributionInfo.markAutoBucket();
}
@ -2109,4 +2129,18 @@ public class OlapTable extends Table {
|| (getKeysType() == KeysType.UNIQUE_KEYS
&& getEnableUniqueKeyMergeOnWrite());
}
public void initAutoIncrentGenerator(long dbId) {
for (Column column : fullSchema) {
if (column.isAutoInc()) {
autoIncrementGenerator = new AutoIncrementGenerator(dbId, id, column.getUniqueId());
autoIncrementGenerator.setEditLog(Env.getCurrentEnv().getEditLog());
break;
}
}
}
public AutoIncrementGenerator getAutoIncrementGenerator() {
return autoIncrementGenerator;
}
}

View File

@ -136,6 +136,7 @@ import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr;
import org.apache.doris.mtmv.MTMVJobFactory;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.DatabaseInfo;
import org.apache.doris.persist.DropDbInfo;
@ -2230,6 +2231,7 @@ public class InternalCatalog implements CatalogIf<Database> {
}
olapTable.initSchemaColumnUniqueId();
olapTable.initAutoIncrentGenerator(db.getId());
olapTable.rebuildFullSchema();
// analyze version info
@ -2965,4 +2967,10 @@ public class InternalCatalog implements CatalogIf<Database> {
public Collection<DatabaseIf> getAllDbs() {
return new HashSet<>(idToDb.values());
}
public void replayAutoIncrementIdUpdateLog(AutoIncrementIdUpdateLog log) throws MetaNotFoundException {
Database db = getDbOrMetaException(log.getDbId());
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(log.getTableId(), TableType.OLAP);
olapTable.getAutoIncrementGenerator().applyChange(log.getColumnId(), log.getBatchEndId());
}
}

View File

@ -67,6 +67,7 @@ import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.AlterUserOperationLog;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.AnalyzeDeletionLog;
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.BarrierLog;
@ -817,6 +818,10 @@ public class JournalEntity implements Writable {
}
case OperationType.OP_DELETE_ANALYSIS_TASK: {
data = AnalyzeDeletionLog.read(in);
break;
}
case OperationType.OP_UPDATE_AUTO_INCREMENT_ID: {
data = AutoIncrementIdUpdateLog.read(in);
isRead = true;
break;
}

View File

@ -562,7 +562,7 @@ public class Load {
/*
* This function will do followings:
* 1. fill the column exprs if user does not specify any column or column mapping.
* 2. For not specified columns, check if they have default value.
* 2. For not specified columns, check if they have default value or they are auto-increment columns.
* 3. Add any shadow columns if have.
* 4. validate hadoop functions
* 5. init slot descs and expr map for load plan
@ -629,7 +629,7 @@ public class Load {
columnExprMap.put(importColumnDesc.getColumnName(), importColumnDesc.getExpr());
}
// check default value
// check default value and auto-increment column
for (Column column : tbl.getBaseSchema()) {
String columnName = column.getName();
if (columnExprMap.containsKey(columnName)) {
@ -638,7 +638,9 @@ public class Load {
if (column.getDefaultValue() != null || column.isAllowNull() || isPartialUpdate) {
continue;
}
//continue;
if (column.isAutoInc()) {
continue;
}
throw new DdlException("Column has no default value. column: " + columnName);
}

View File

@ -0,0 +1,71 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class AutoIncrementIdUpdateLog implements Writable {
@SerializedName(value = "dbId")
private Long dbId;
@SerializedName(value = "tableId")
private Long tableId;
@SerializedName(value = "columnId")
private Long columnId;
@SerializedName(value = "batchEndId")
private Long batchEndId;
public AutoIncrementIdUpdateLog(Long dbId, Long tableId, Long columnId, Long batchEndId) {
this.dbId = dbId;
this.tableId = tableId;
this.columnId = columnId;
this.batchEndId = batchEndId;
}
public Long getDbId() {
return dbId;
}
public Long getTableId() {
return tableId;
}
public Long getColumnId() {
return columnId;
}
public long getBatchEndId() {
return batchEndId;
}
public static AutoIncrementIdUpdateLog read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), AutoIncrementIdUpdateLog.class);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
}

View File

@ -1052,6 +1052,10 @@ public class EditLog {
LOG.info("replay barrier");
break;
}
case OperationType.OP_UPDATE_AUTO_INCREMENT_ID: {
env.replayAutoIncrementIdUpdateLog((AutoIncrementIdUpdateLog) journal.getData());
break;
}
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@ -1835,4 +1839,8 @@ public class EditLog {
public long logBarrier() {
return logEdit(OperationType.OP_BARRIER, new BarrierLog());
}
public void logUpdateAutoIncrementId(AutoIncrementIdUpdateLog log) {
logEdit(OperationType.OP_UPDATE_AUTO_INCREMENT_ID, log);
}
}

View File

@ -310,6 +310,8 @@ public class OperationType {
public static final short OP_BARRIER = 436;
// change an auto increment id for a column
public static final short OP_UPDATE_AUTO_INCREMENT_ID = 437;
/**
* Get opcode name by op code.

View File

@ -261,6 +261,11 @@ public class FileLoadScanNode extends FileScanNode {
} else {
if (column.isAllowNull()) {
expr = NullLiteral.create(column.getType());
} else if (column.isAutoInc()) {
// auto-increment column should be non-nullable
// however, here we use `NullLiteral` to indicate that a cell should
// be filled with generated value in `VOlapTableSink::_fill_auto_inc_cols()`
expr = NullLiteral.create(column.getType());
} else {
throw new AnalysisException("column has no source field, column=" + column.getName());
}

View File

@ -176,10 +176,18 @@ public class StreamLoadPlanner {
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
slotDesc.setAutoInc(col.isAutoInc());
SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc);
scanSlotDesc.setIsMaterialized(true);
scanSlotDesc.setColumn(col);
scanSlotDesc.setIsNullable(col.isAllowNull());
scanSlotDesc.setAutoInc(col.isAutoInc());
if (col.isAutoInc()) {
// auto-increment column should be non-nullable
// however, here we use `NullLiteral` to indicate that a cell should
// be filled with generated value in `VOlapTableSink::_fill_auto_inc_cols()`
scanSlotDesc.setIsNullable(true);
}
for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
try {
if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null
@ -378,10 +386,18 @@ public class StreamLoadPlanner {
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
slotDesc.setAutoInc(col.isAutoInc());
SlotDescriptor scanSlotDesc = descTable.addSlotDescriptor(scanTupleDesc);
scanSlotDesc.setIsMaterialized(true);
scanSlotDesc.setColumn(col);
scanSlotDesc.setIsNullable(col.isAllowNull());
scanSlotDesc.setAutoInc(col.isAutoInc());
if (col.isAutoInc()) {
// auto-increment column should be non-nullable
// however, here we use `NullLiteral` to indicate that a cell should
// be filled with generated value in `VOlapTableSink::_fill_auto_inc_cols()`
scanSlotDesc.setIsNullable(true);
}
for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
try {
if (!importColumnDesc.isColumn() && importColumnDesc.getColumnName() != null

View File

@ -28,6 +28,7 @@ import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TypeDef;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.Snapshot;
import org.apache.doris.catalog.AutoIncrementGenerator;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
@ -86,6 +87,8 @@ import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.FrontendServiceVersion;
import org.apache.doris.thrift.TAddColumnsRequest;
import org.apache.doris.thrift.TAddColumnsResult;
import org.apache.doris.thrift.TAutoIncrementRangeRequest;
import org.apache.doris.thrift.TAutoIncrementRangeResult;
import org.apache.doris.thrift.TBeginTxnRequest;
import org.apache.doris.thrift.TBeginTxnResult;
import org.apache.doris.thrift.TBinlog;
@ -2322,6 +2325,41 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
@Override
public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeRequest request) {
String clientAddr = getClientAddrAsString();
LOG.debug("receive get auto-increement range request: {}, backend: {}", request, clientAddr);
TAutoIncrementRangeResult result = new TAutoIncrementRangeResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
try {
Env env = Env.getCurrentEnv();
Database db = env.getInternalCatalog().getDbOrMetaException(request.getDbId());
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(request.getTableId(), TableType.OLAP);
AutoIncrementGenerator autoIncrementGenerator = null;
autoIncrementGenerator = olapTable.getAutoIncrementGenerator();
long columnId = request.getColumnId();
long length = request.getLength();
long lowerBound = -1;
if (request.isSetLowerBound()) {
lowerBound = request.getLowerBound();
}
Pair<Long, Long> range = autoIncrementGenerator.getAutoIncrementRange(columnId, length, lowerBound);
result.setStart(range.first);
result.setLength(range.second);
} catch (UserException e) {
LOG.warn("failed to get auto-increment range of column {}: {}", request.getColumnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
}
return result;
}
public TGetBinlogResult getBinlog(TGetBinlogRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.debug("receive get binlog request: {}", request);