[enhancement](broker-load) fix-move-memtable-session-var-for-s3 (#28894)

This commit is contained in:
Siyang Tang
2023-12-22 23:25:06 +08:00
committed by GitHub
parent 3b830f89a7
commit 966766f3b7
3 changed files with 15 additions and 4 deletions

View File

@ -83,6 +83,8 @@ public class BrokerLoadJob extends BulkLoadJob {
// If set to true, the profile of load job with be pushed to ProfileManager
private boolean enableProfile = false;
private boolean enableMemTableOnSinkNode = false;
// for log replay and unit test
public BrokerLoadJob() {
super(EtlJobType.BROKER);
@ -93,8 +95,9 @@ public class BrokerLoadJob extends BulkLoadJob {
throws MetaNotFoundException {
super(EtlJobType.BROKER, dbId, label, originStmt, userInfo);
this.brokerDesc = brokerDesc;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enableProfile()) {
enableProfile = true;
if (ConnectContext.get() != null) {
enableProfile = ConnectContext.get().getSessionVariable().enableProfile();
enableMemTableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode;
}
}
@ -212,7 +215,7 @@ public class BrokerLoadJob extends BulkLoadJob {
isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(),
getLoadParallelism(), getSendBatchParallelism(),
getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink(),
useNewLoadScanNode(), getPriority());
useNewLoadScanNode(), getPriority(), enableMemTableOnSinkNode);
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());

View File

@ -72,6 +72,8 @@ public class LoadLoadingTask extends LoadTask {
private final boolean singleTabletLoadPerSink;
private final boolean useNewLoadScanNode;
private final boolean enableMemTableOnSinkNode;
private LoadingTaskPlanner planner;
private Profile jobProfile;
@ -83,7 +85,7 @@ public class LoadLoadingTask extends LoadTask {
long txnId, LoadTaskCallback callback, String timezone,
long timeoutS, int loadParallelism, int sendBatchParallelism,
boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink,
boolean useNewLoadScanNode, Priority priority) {
boolean useNewLoadScanNode, Priority priority, boolean enableMemTableOnSinkNode) {
super(callback, TaskType.LOADING, priority);
this.db = db;
this.table = table;
@ -104,6 +106,7 @@ public class LoadLoadingTask extends LoadTask {
this.jobProfile = jobProfile;
this.singleTabletLoadPerSink = singleTabletLoadPerSink;
this.useNewLoadScanNode = useNewLoadScanNode;
this.enableMemTableOnSinkNode = enableMemTableOnSinkNode;
}
public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList,
@ -152,6 +155,7 @@ public class LoadLoadingTask extends LoadTask {
*/
curCoordinator.setLoadMemLimit(execMemLimit);
curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode);
try {
QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator);

View File

@ -2676,6 +2676,10 @@ public class Coordinator implements CoordInterface {
}
}
public void setMemTableOnSinkNode(boolean enableMemTableOnSinkNode) {
this.queryOptions.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode);
}
// map from a BE host address to the per-node assigned scan ranges;
// records scan range assignment for a single fragment
static class FragmentScanRangeAssignment