From 966766f3b752dfcd66a20351830a8b75d1f7e1b2 Mon Sep 17 00:00:00 2001 From: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com> Date: Fri, 22 Dec 2023 23:25:06 +0800 Subject: [PATCH] [enhancement](broker-load) fix-move-memtable-session-var-for-s3 (#28894) --- .../java/org/apache/doris/load/loadv2/BrokerLoadJob.java | 9 ++++++--- .../org/apache/doris/load/loadv2/LoadLoadingTask.java | 6 +++++- .../src/main/java/org/apache/doris/qe/Coordinator.java | 4 ++++ 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index fd416b8e44..da89db859d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -83,6 +83,8 @@ public class BrokerLoadJob extends BulkLoadJob { // If set to true, the profile of load job with be pushed to ProfileManager private boolean enableProfile = false; + private boolean enableMemTableOnSinkNode = false; + // for log replay and unit test public BrokerLoadJob() { super(EtlJobType.BROKER); @@ -93,8 +95,9 @@ public class BrokerLoadJob extends BulkLoadJob { throws MetaNotFoundException { super(EtlJobType.BROKER, dbId, label, originStmt, userInfo); this.brokerDesc = brokerDesc; - if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enableProfile()) { - enableProfile = true; + if (ConnectContext.get() != null) { + enableProfile = ConnectContext.get().getSessionVariable().enableProfile(); + enableMemTableOnSinkNode = ConnectContext.get().getSessionVariable().enableMemtableOnSinkNode; } } @@ -212,7 +215,7 @@ public class BrokerLoadJob extends BulkLoadJob { isStrictMode(), isPartialUpdate(), transactionId, this, getTimeZone(), getTimeout(), getLoadParallelism(), getSendBatchParallelism(), getMaxFilterRatio() <= 0, enableProfile ? jobProfile : null, isSingleTabletLoadPerSink(), - useNewLoadScanNode(), getPriority()); + useNewLoadScanNode(), getPriority(), enableMemTableOnSinkNode); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index d98ba2ba4a..fa2eadcfa6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -72,6 +72,8 @@ public class LoadLoadingTask extends LoadTask { private final boolean singleTabletLoadPerSink; private final boolean useNewLoadScanNode; + private final boolean enableMemTableOnSinkNode; + private LoadingTaskPlanner planner; private Profile jobProfile; @@ -83,7 +85,7 @@ public class LoadLoadingTask extends LoadTask { long txnId, LoadTaskCallback callback, String timezone, long timeoutS, int loadParallelism, int sendBatchParallelism, boolean loadZeroTolerance, Profile jobProfile, boolean singleTabletLoadPerSink, - boolean useNewLoadScanNode, Priority priority) { + boolean useNewLoadScanNode, Priority priority, boolean enableMemTableOnSinkNode) { super(callback, TaskType.LOADING, priority); this.db = db; this.table = table; @@ -104,6 +106,7 @@ public class LoadLoadingTask extends LoadTask { this.jobProfile = jobProfile; this.singleTabletLoadPerSink = singleTabletLoadPerSink; this.useNewLoadScanNode = useNewLoadScanNode; + this.enableMemTableOnSinkNode = enableMemTableOnSinkNode; } public void init(TUniqueId loadId, List> fileStatusList, @@ -152,6 +155,7 @@ public class LoadLoadingTask extends LoadTask { */ curCoordinator.setLoadMemLimit(execMemLimit); curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000)); + curCoordinator.setMemTableOnSinkNode(enableMemTableOnSinkNode); try { QeProcessorImpl.INSTANCE.registerQuery(loadId, curCoordinator); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9564461d75..ed7f73e173 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2676,6 +2676,10 @@ public class Coordinator implements CoordInterface { } } + public void setMemTableOnSinkNode(boolean enableMemTableOnSinkNode) { + this.queryOptions.setEnableMemtableOnSinkNode(enableMemTableOnSinkNode); + } + // map from a BE host address to the per-node assigned scan ranges; // records scan range assignment for a single fragment static class FragmentScanRangeAssignment