backport #32845 Co-authored-by: Xin Liao <liaoxinbit@126.com>
This commit is contained in:
@ -1038,13 +1038,16 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
}
|
||||
if (targetTable instanceof OlapTable) {
|
||||
OlapTableSink sink;
|
||||
final boolean enableSingleReplicaLoad =
|
||||
analyzer.getContext().getSessionVariable().isEnableMemtableOnSinkNode()
|
||||
? false : analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert();
|
||||
if (isGroupCommitStreamLoadSql) {
|
||||
sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple,
|
||||
targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(),
|
||||
targetPartitionIds, enableSingleReplicaLoad,
|
||||
ConnectContext.get().getSessionVariable().getGroupCommit(), 0);
|
||||
} else {
|
||||
sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds,
|
||||
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
|
||||
enableSingleReplicaLoad);
|
||||
}
|
||||
dataSink = sink;
|
||||
sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
|
||||
|
||||
@ -118,7 +118,7 @@ public class LoadLoadingTask extends LoadTask {
|
||||
this.loadId = loadId;
|
||||
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups,
|
||||
strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism,
|
||||
this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink);
|
||||
this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, enableMemTableOnSinkNode);
|
||||
planner.plan(loadId, fileStatusList, fileNum);
|
||||
}
|
||||
|
||||
|
||||
@ -73,6 +73,7 @@ public class LoadingTaskPlanner {
|
||||
private final int sendBatchParallelism;
|
||||
private final boolean useNewLoadScanNode;
|
||||
private final boolean singleTabletLoadPerSink;
|
||||
private final boolean enableMemtableOnSinkNode;
|
||||
private UserIdentity userInfo;
|
||||
// Something useful
|
||||
// ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase()
|
||||
@ -89,7 +90,7 @@ public class LoadingTaskPlanner {
|
||||
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
|
||||
boolean strictMode, boolean isPartialUpdate, String timezone, long timeoutS, int loadParallelism,
|
||||
int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo,
|
||||
boolean singleTabletLoadPerSink) {
|
||||
boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode) {
|
||||
this.loadJobId = loadJobId;
|
||||
this.txnId = txnId;
|
||||
this.dbId = dbId;
|
||||
@ -103,8 +104,9 @@ public class LoadingTaskPlanner {
|
||||
this.loadParallelism = loadParallelism;
|
||||
this.sendBatchParallelism = sendBatchParallelism;
|
||||
this.useNewLoadScanNode = useNewLoadScanNode;
|
||||
this.singleTabletLoadPerSink = singleTabletLoadPerSink;
|
||||
this.userInfo = userInfo;
|
||||
this.singleTabletLoadPerSink = singleTabletLoadPerSink;
|
||||
this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
|
||||
if (Env.getCurrentEnv().getAccessManager()
|
||||
.checkDbPriv(userInfo, InternalCatalog.INTERNAL_CATALOG_NAME,
|
||||
Env.getCurrentInternalCatalog().getDbNullable(dbId).getFullName(),
|
||||
@ -206,8 +208,10 @@ public class LoadingTaskPlanner {
|
||||
|
||||
// 2. Olap table sink
|
||||
List<Long> partitionIds = getAllPartitionIds();
|
||||
final boolean enableSingleReplicaLoad = this.enableMemtableOnSinkNode
|
||||
? false : Config.enable_single_replica_load;
|
||||
OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds,
|
||||
Config.enable_single_replica_load);
|
||||
enableSingleReplicaLoad);
|
||||
olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode);
|
||||
olapTableSink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateInputColumns);
|
||||
|
||||
|
||||
@ -261,15 +261,20 @@ public class StreamLoadPlanner {
|
||||
timeout *= 2;
|
||||
}
|
||||
|
||||
final boolean enableMemtableOnSinkNode =
|
||||
destTable.getTableProperty().getUseSchemaLightChange()
|
||||
? taskInfo.isMemtableOnSinkNode() : false;
|
||||
final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode
|
||||
? false : Config.enable_single_replica_load;
|
||||
// create dest sink
|
||||
List<Long> partitionIds = getAllPartitionIds();
|
||||
OlapTableSink olapTableSink;
|
||||
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) {
|
||||
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
|
||||
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(),
|
||||
enableSingleReplicaLoad, ((StreamLoadTask) taskInfo).getGroupCommit(),
|
||||
taskInfo.getMaxFilterRatio());
|
||||
} else {
|
||||
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
|
||||
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, enableSingleReplicaLoad);
|
||||
}
|
||||
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(),
|
||||
taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
|
||||
@ -326,10 +331,7 @@ public class StreamLoadPlanner {
|
||||
queryOptions.setBeExecVersion(Config.be_exec_version);
|
||||
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
|
||||
queryOptions.setEnableProfile(taskInfo.getEnableProfile());
|
||||
boolean isEnableMemtableOnSinkNode =
|
||||
destTable.getTableProperty().getUseSchemaLightChange()
|
||||
? taskInfo.isMemtableOnSinkNode() : false;
|
||||
queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
|
||||
queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
|
||||
params.setQueryOptions(queryOptions);
|
||||
TQueryGlobals queryGlobals = new TQueryGlobals();
|
||||
queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()));
|
||||
@ -493,15 +495,20 @@ public class StreamLoadPlanner {
|
||||
timeout *= 2;
|
||||
}
|
||||
|
||||
final boolean enableMemtableOnSinkNode =
|
||||
destTable.getTableProperty().getUseSchemaLightChange()
|
||||
? taskInfo.isMemtableOnSinkNode() : false;
|
||||
final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode
|
||||
? false : Config.enable_single_replica_load;
|
||||
// create dest sink
|
||||
List<Long> partitionIds = getAllPartitionIds();
|
||||
OlapTableSink olapTableSink;
|
||||
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) {
|
||||
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
|
||||
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(),
|
||||
enableSingleReplicaLoad, ((StreamLoadTask) taskInfo).getGroupCommit(),
|
||||
taskInfo.getMaxFilterRatio());
|
||||
} else {
|
||||
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
|
||||
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, enableSingleReplicaLoad);
|
||||
}
|
||||
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
|
||||
taskInfo.getSendBatchParallelism(), taskInfo.isLoadToSingleTablet(), taskInfo.isStrictMode());
|
||||
@ -560,10 +567,7 @@ public class StreamLoadPlanner {
|
||||
queryOptions.setBeExecVersion(Config.be_exec_version);
|
||||
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
|
||||
queryOptions.setEnableProfile(taskInfo.getEnableProfile());
|
||||
boolean isEnableMemtableOnSinkNode =
|
||||
destTable.getTableProperty().getUseSchemaLightChange()
|
||||
? taskInfo.isMemtableOnSinkNode() : false;
|
||||
queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
|
||||
queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
|
||||
|
||||
pipParams.setQueryOptions(queryOptions);
|
||||
TQueryGlobals queryGlobals = new TQueryGlobals();
|
||||
|
||||
@ -3186,6 +3186,14 @@ public class SessionVariable implements Serializable, Writable {
|
||||
this.enableSingleReplicaInsert = enableSingleReplicaInsert;
|
||||
}
|
||||
|
||||
public boolean isEnableMemtableOnSinkNode() {
|
||||
return enableMemtableOnSinkNode;
|
||||
}
|
||||
|
||||
public void setEnableMemtableOnSinkNode(boolean enableMemtableOnSinkNode) {
|
||||
this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
|
||||
}
|
||||
|
||||
public boolean isEnableRuntimeFilterPrune() {
|
||||
return enableRuntimeFilterPrune;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user