[Feature](merge-on-write)Support ignore mode for merge-on-write unique table (#21773)

This commit is contained in:
bobhan1
2023-09-14 18:03:51 +08:00
committed by GitHub
parent 68b13ab50f
commit 3ee89aea35
34 changed files with 479 additions and 62 deletions

View File

@ -887,7 +887,7 @@ nonterminal ParseNode load_property;
nonterminal List<ParseNode> opt_load_property_list;
// Boolean
nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate, opt_local, opt_is_auto_inc;
nonterminal Boolean opt_negative, opt_is_allow_null, opt_is_key, opt_read_only, opt_aggregate, opt_local, opt_is_auto_inc, opt_is_insert_ignore;
nonterminal String opt_from_rollup, opt_to_rollup;
nonterminal ColumnPosition opt_col_pos;
@ -3718,6 +3718,17 @@ opt_is_auto_inc ::=
RESULT = true;
:}
;
opt_is_insert_ignore ::=
{:
RESULT = false;
:}
| KW_IGNORE
{:
RESULT = true;
:}
;
opt_comment ::=
/* empty */
{:
@ -4805,16 +4816,16 @@ insert_overwrite_stmt ::=
// Insert statement
insert_stmt ::=
KW_INSERT KW_INTO insert_target:target opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source
KW_INSERT opt_is_insert_ignore:is_insert_ignore KW_INTO insert_target:target opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source
{:
RESULT = new NativeInsertStmt(target, label, cols, source, hints);
RESULT = new NativeInsertStmt(target, label, cols, source, hints, is_insert_ignore);
:}
// TODO(zc) add default value for SQL-2003
// | KW_INSERT KW_INTO insert_target:target KW_DEFAULT KW_VALUES
| /* used for group commit */
KW_INSERT KW_INTO INTEGER_LITERAL:table_id opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source
KW_INSERT opt_is_insert_ignore:is_insert_ignore KW_INTO INTEGER_LITERAL:table_id opt_with_label:label opt_col_list:cols opt_plan_hints:hints insert_source:source
{:
RESULT = new NativeInsertStmt(table_id, label, cols, source, hints);
RESULT = new NativeInsertStmt(table_id, label, cols, source, hints, is_insert_ignore);
:}
;

View File

@ -201,7 +201,8 @@ public class DeleteStmt extends DdlStmt {
cols,
new InsertSource(selectStmt),
null,
isPartialUpdate);
isPartialUpdate,
false);
}
private void analyzeTargetTable(Analyzer analyzer) throws UserException {

View File

@ -166,6 +166,8 @@ public class NativeInsertStmt extends InsertStmt {
// true if be generates an insert from group commit tvf stmt and executes to load data
public boolean isInnerGroupCommit = false;
private boolean isInsertIgnore = false;
public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints) {
super(new LabelName(null, label), null, null);
@ -185,6 +187,26 @@ public class NativeInsertStmt extends InsertStmt {
this.tableId = tableId;
}
public NativeInsertStmt(long tableId, String label, List<String> cols, InsertSource source,
List<String> hints, boolean isInsertIgnore) {
this(new InsertTarget(new TableName(null, null, null), null), label, cols, source, hints, isInsertIgnore);
this.tableId = tableId;
}
public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints, boolean isInsertIgnore) {
super(new LabelName(null, label), null, null);
this.tblName = target.getTblName();
this.targetPartitionNames = target.getPartitionNames();
this.label = new LabelName(null, label);
this.queryStmt = source.getQueryStmt();
this.planHints = hints;
this.isInsertIgnore = isInsertIgnore;
this.targetColumnNames = cols;
this.isValuesOrConstantSelect = (queryStmt instanceof SelectStmt
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
}
// Ctor for CreateTableAsSelectStmt and InsertOverwriteTableStmt
public NativeInsertStmt(TableName name, PartitionNames targetPartitionNames, LabelName label,
QueryStmt queryStmt, List<String> planHints, List<String> targetColumnNames) {
@ -199,10 +221,11 @@ public class NativeInsertStmt extends InsertStmt {
}
public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints, boolean isPartialUpdate) {
List<String> hints, boolean isPartialUpdate, boolean isInsertIgnore) {
this(target, label, cols, source, hints);
this.isPartialUpdate = isPartialUpdate;
this.partialUpdateCols.addAll(cols);
this.isInsertIgnore = isInsertIgnore;
}
public boolean isValuesOrConstantSelect() {
@ -382,7 +405,8 @@ public class NativeInsertStmt extends InsertStmt {
OlapTableSink sink = (OlapTableSink) dataSink;
TUniqueId loadId = analyzer.getContext().queryId();
int sendBatchParallelism = analyzer.getContext().getSessionVariable().getSendBatchParallelism();
sink.init(loadId, transactionId, db.getId(), timeoutSecond, sendBatchParallelism, false, false);
sink.init(loadId, transactionId, db.getId(), timeoutSecond,
sendBatchParallelism, false, false, isInsertIgnore);
}
}

View File

@ -125,7 +125,8 @@ public class UpdateStmt extends DdlStmt {
cols,
new InsertSource(selectStmt),
null,
isPartialUpdate);
isPartialUpdate,
false);
}
private void analyzeTargetTable(Analyzer analyzer) throws UserException {

View File

@ -193,8 +193,9 @@ public class LoadingTaskPlanner {
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds,
Config.enable_single_replica_load);
olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, false, strictMode);
olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, false, strictMode, false);
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
olapTableSink.complete(analyzer);
// 3. Plan fragment

View File

@ -150,7 +150,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
sink.init(ctx.queryId(), txn.getTxnId(),
physicalOlapTableSink.getDatabase().getId(),
ctx.getExecTimeout(),
ctx.getSessionVariable().getSendBatchParallelism(), false, false);
ctx.getSessionVariable().getSendBatchParallelism(), false, false, false);
sink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(

View File

@ -26,6 +26,7 @@ import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
@ -104,6 +105,8 @@ public class OlapTableSink extends DataSink {
private boolean isStrictMode = false;
private boolean isUniqueKeyIgnoreMode = false;
public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
boolean singleReplicaLoad) {
this.dstTable = dstTable;
@ -113,7 +116,7 @@ public class OlapTableSink extends DataSink {
}
public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeoutS, int sendBatchParallelism,
boolean loadToSingleTablet, boolean isStrictMode) throws AnalysisException {
boolean loadToSingleTablet, boolean isStrictMode, boolean isUniqueKeyIgnoreMode) throws AnalysisException {
TOlapTableSink tSink = new TOlapTableSink();
tSink.setLoadId(loadId);
tSink.setTxnId(txnId);
@ -121,6 +124,7 @@ public class OlapTableSink extends DataSink {
tSink.setLoadChannelTimeoutS(loadChannelTimeoutS);
tSink.setSendBatchParallelism(sendBatchParallelism);
this.isStrictMode = isStrictMode;
this.isUniqueKeyIgnoreMode = isUniqueKeyIgnoreMode;
if (loadToSingleTablet && !(dstTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo)) {
throw new AnalysisException(
"if load_to_single_tablet set to true," + " the olap table must be with random distribution");
@ -186,6 +190,17 @@ public class OlapTableSink extends DataSink {
}
tSink.setWriteSingleReplica(singleReplicaLoad);
tSink.setNodesInfo(createPaloNodesInfo());
if (isUniqueKeyIgnoreMode) {
if (dstTable.getKeysType() != KeysType.UNIQUE_KEYS || !dstTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("ignore mode can only be enabled if the target table is "
+ "a unique table with merge-on-write enabled.");
} else if (isPartialUpdate) {
throw new UserException("ignore mode can't be used in partial update.");
} else if (dstTable.hasSequenceCol()) {
throw new UserException("ignore mode can't be used if the target table has sequence column, "
+ "but table[" + dstTable.getName() + "] has sequnce column.");
}
}
}
@Override
@ -221,6 +236,7 @@ public class OlapTableSink extends DataSink {
schemaParam.setTableId(table.getId());
schemaParam.setVersion(table.getIndexMetaByIndexId(table.getBaseIndexId()).getSchemaVersion());
schemaParam.setIsStrictMode(isStrictMode);
schemaParam.setIsUniqueKeyIgnoreMode(isUniqueKeyIgnoreMode);
schemaParam.tuple_desc = tupleDescriptor.toThrift();
for (SlotDescriptor slotDesc : tupleDescriptor.getSlots()) {

View File

@ -256,8 +256,8 @@ public class StreamLoadPlanner {
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load);
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(),
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), taskInfo.isIgnoreMode());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
olapTableSink.complete(analyzer);
@ -465,8 +465,8 @@ public class StreamLoadPlanner {
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load);
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(),
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode(), taskInfo.isIgnoreMode());
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
olapTableSink.complete(analyzer);

View File

@ -125,6 +125,10 @@ public interface LoadTaskInfo {
return false;
}
default boolean isIgnoreMode() {
return false;
}
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;

View File

@ -84,6 +84,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private List<String> hiddenColumns;
private boolean trimDoubleQuotes = false;
private boolean isPartialUpdate = false;
private boolean isIgnoreMode = false;
private int skipLines = 0;
private boolean enableProfile = false;
@ -308,6 +309,10 @@ public class StreamLoadTask implements LoadTaskInfo {
this.memtableOnSinkNode = memtableOnSinkNode;
}
public boolean isIgnoreMode() {
return isIgnoreMode;
}
public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@ -444,6 +449,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetMemtableOnSinkNode()) {
this.memtableOnSinkNode = request.isMemtableOnSinkNode();
}
if (request.isSetIgnoreMode()) {
isIgnoreMode = request.isIgnoreMode();
}
}
// used for stream load

View File

@ -107,7 +107,7 @@ public class OlapTableSinkTest {
new DataProperty(DataProperty.DEFAULT_STORAGE_MEDIUM));
dstTable.getPartitionInfo().setIsMutable(partition.getId(), true);
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(2L), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false);
sink.complete(null);
LOG.info("sink is {}", sink.toThrift());
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
@ -144,7 +144,7 @@ public class OlapTableSinkTest {
};
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false);
try {
sink.complete(null);
} catch (UserException e) {
@ -169,7 +169,7 @@ public class OlapTableSinkTest {
};
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(unknownPartId), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false);
sink.complete(null);
LOG.info("sink is {}", sink.toThrift());
LOG.info("{}", sink.getExplainString("", TExplainLevel.NORMAL));
@ -206,7 +206,7 @@ public class OlapTableSinkTest {
};
OlapTableSink sink = new OlapTableSink(dstTable, tuple, Lists.newArrayList(p1.getId()), false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false);
sink.init(new TUniqueId(1, 2), 3, 4, 1000, 1, false, false, false);
try {
sink.complete(null);
} catch (UserException e) {