diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index f85a5218b3..0e4fadd24c 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1311,7 +1311,9 @@ void TaskWorkerPool::_report_disk_state_worker_thread_callback() {
disk.__set_used(root_path_info.is_used);
request.disks[root_path_info.path] = disk;
}
-
+ int num_cores = config::pipeline_executor_size > 0 ? config::pipeline_executor_size
+ : CpuInfo::num_cores();
+ request.__set_num_cores(num_cores);
_handle_report(request, ReportType::DISK);
}
StorageEngine::instance()->deregister_report_listener(this);
diff --git a/docs/en/docs/admin-manual/config/fe-config.md b/docs/en/docs/admin-manual/config/fe-config.md
index bc3acae538..6bb154f572 100644
--- a/docs/en/docs/admin-manual/config/fe-config.md
+++ b/docs/en/docs/admin-manual/config/fe-config.md
@@ -2701,3 +2701,11 @@ If false, when select from tables in information_schema database,
the result will not contain the information of the table in external catalog.
This is to avoid query time when external catalog is not reachable.
+#### `max_instance_num`
+
+
+
+Default: 128
+
+This is used to limit the setting of "parallel_fragment_exec_instance_num".
+"parallel_fragment_exec_instance_num" cannot be set higher than "max_instance_num".
\ No newline at end of file
diff --git a/docs/en/docs/query-acceleration/pipeline-execution-engine.md b/docs/en/docs/query-acceleration/pipeline-execution-engine.md
index ecebea78ee..994bdac6d9 100644
--- a/docs/en/docs/query-acceleration/pipeline-execution-engine.md
+++ b/docs/en/docs/query-acceleration/pipeline-execution-engine.md
@@ -76,3 +76,8 @@ The default configuration of `parallel_fragment_exec_instance_num` represents th
```
set parallel_fragment_exec_instance_num = 16;
```
+
+Specifically, if set to 0, the concurrency in the Pipeline execution engine will automatically be set to half of the number of CPU cores.
+"parallel_fragment_exec_instance_num" cannot be set higher than the "max_instance_num" in "fe.conf" (default is 128).
+
+
diff --git a/docs/zh-CN/docs/admin-manual/config/fe-config.md b/docs/zh-CN/docs/admin-manual/config/fe-config.md
index 99b97544bd..57612c7d26 100644
--- a/docs/zh-CN/docs/admin-manual/config/fe-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/fe-config.md
@@ -2703,3 +2703,11 @@ show data (其他用法:HELP SHOW DATA)
当设置为 false 时,查询 `information_schema` 中的表时,将不再返回 external catalog 中的表的信息。
这个参数主要用于避免因 external catalog 无法访问、信息过多等原因导致的查询 `information_schema` 超时的问题。
+
+#### `max_instance_num`
+
+
+
+默认值:128
+
+用于限制parallel_fragment_exec_instance_num的设置,set parallel_fragment_exec_instance_num不能超过max_instance_num
\ No newline at end of file
diff --git a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
index f3bcd2b7e7..a23336a63b 100644
--- a/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
+++ b/docs/zh-CN/docs/query-acceleration/pipeline-execution-engine.md
@@ -76,3 +76,5 @@ set enable_pipeline_engine = true;
```
set parallel_fragment_exec_instance_num = 16;
```
+
+特别的,如果设置为0, 则在Pipeline 执行引擎中的并发数会自动的设置为cpu核心数目的一半。并且parallel_fragment_exec_instance_num不能设置超过fe.conf中的max_instance_num(默认128)
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 38df46f922..33141597ec 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1932,6 +1932,15 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static boolean disable_datev1 = true;
+ /*
+ * "max_instance_num" is used to set the maximum concurrency. When the value set
+ * by "parallel_fragment_exec_instance_num" is greater than "max_instance_num",
+ * an error will be reported.
+ */
+ @ConfField(mutable = true)
+ public static int max_instance_num = 128;
+
+
/**
* This config used for export/outfile.
* Whether delete all files in the directory specified by export/outfile.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
index 37eac5ac30..ccb505bbe4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java
@@ -19,6 +19,7 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
@@ -155,7 +156,14 @@ public class SetVar {
this.value = new StringLiteral(TimeUtils.checkTimeZoneValidAndStandardize(getValue().getStringValue()));
this.result = (LiteralExpr) this.value;
}
-
+ if (getVariable().equalsIgnoreCase(SessionVariable.PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)) {
+ int instanceNum = Integer.parseInt(getValue().getStringValue());
+ if (instanceNum > Config.max_instance_num) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR,
+ SessionVariable.PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM,
+ instanceNum + "(Should not be set to more than " + Config.max_instance_num + ")");
+ }
+ }
if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) {
this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue())));
this.result = (LiteralExpr) this.value;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 646bed22a3..718e78ca18 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -194,6 +194,11 @@ public class ReportHandler extends Daemon {
LOG.info("receive report from be {}. type: {}, current queue size: {}",
backend.getId(), reportType, reportQueue.size());
+ if (reportType == ReportType.DISK) {
+ Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
+ int numCores = request.getNumCores();
+ beinfoCollector.addBeInfo(beId, numCores);
+ }
return result;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index ad2143b3da..fcde985f42 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.nereids.metrics.Event;
import org.apache.doris.nereids.metrics.EventSwitchParser;
import org.apache.doris.qe.VariableMgr.VarAttr;
+import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TResourceLimit;
import org.apache.doris.thrift.TRuntimeFilterType;
@@ -1224,7 +1225,12 @@ public class SessionVariable implements Serializable, Writable {
}
public int getParallelExecInstanceNum() {
- return parallelExecInstanceNum;
+ if (enablePipelineEngine && parallelExecInstanceNum == 0) {
+ Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
+ return beinfoCollector.getParallelExecInstanceNum();
+ } else {
+ return parallelExecInstanceNum;
+ }
}
public int getExchangeInstanceParallel() {
@@ -1768,7 +1774,7 @@ public class SessionVariable implements Serializable, Writable {
tResult.setCodegenLevel(codegenLevel);
tResult.setBeExecVersion(Config.be_exec_version);
tResult.setEnablePipelineEngine(enablePipelineEngine);
- tResult.setParallelInstance(parallelExecInstanceNum);
+ tResult.setParallelInstance(getParallelExecInstanceNum());
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 6f7b9326ed..fd7c8fc003 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -289,7 +289,7 @@ public class StmtExecutor {
builder.instancesNumPerBe(
beToInstancesNum.entrySet().stream().map(entry -> entry.getKey() + ":" + entry.getValue())
.collect(Collectors.joining(",")));
- builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.parallelExecInstanceNum));
+ builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum()));
builder.traceId(context.getSessionVariable().getTraceId());
return builder.build();
}
@@ -2206,4 +2206,3 @@ public class StmtExecutor {
}
}
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
index dc661ef757..f1faaa20a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/Backend.java
@@ -49,6 +49,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -513,7 +514,6 @@ public class Backend implements Writable {
long dataUsedCapacityB = tDisk.getDataUsedCapacity();
long diskAvailableCapacityB = tDisk.getDiskAvailableCapacity();
boolean isUsed = tDisk.isUsed();
-
DiskInfo diskInfo = disks.get(rootPath);
if (diskInfo == null) {
diskInfo = new DiskInfo(rootPath);
@@ -799,4 +799,64 @@ public class Backend implements Writable {
public String getTagMapString() {
return "{" + new PrintableMap<>(tagMap, ":", true, false).toString() + "}";
}
+
+ public static BeInfoCollector getBeInfoCollector() {
+ return BeInfoCollector.get();
+ }
+
+ public static class BeInfoCollector {
+ private int numCores = 1;
+ private static volatile BeInfoCollector instance = null;
+ private static final Map Info = new ConcurrentHashMap<>();
+
+ private BeInfoCollector(int numCores) {
+ this.numCores = numCores;
+ }
+
+ public static BeInfoCollector get() {
+ if (instance == null) {
+ synchronized (BeInfoCollector.class) {
+ if (instance == null) {
+ instance = new BeInfoCollector(Integer.MAX_VALUE);
+ }
+ }
+ }
+ return instance;
+ }
+
+ public int getNumCores() {
+ return numCores;
+ }
+
+ public void clear() {
+ Info.clear();
+ }
+
+ public void addBeInfo(long beId, int numCores) {
+ Info.put(beId, new BeInfoCollector(numCores));
+ }
+
+ public void dropBeInfo(long beId) {
+ Info.remove(beId);
+ }
+
+ public int getMinNumCores() {
+ int minNumCores = Integer.MAX_VALUE;
+ for (BeInfoCollector beinfo : Info.values()) {
+ minNumCores = Math.min(minNumCores, beinfo.getNumCores());
+ }
+ return Math.max(1, minNumCores);
+ }
+
+ public int getParallelExecInstanceNum() {
+ if (getMinNumCores() == Integer.MAX_VALUE) {
+ return 1;
+ }
+ return (getMinNumCores() + 1) / 2;
+ }
+
+ public BeInfoCollector getBeInfoCollectorById(long beId) {
+ return Info.get(beId);
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 76a664bb28..16883fe745 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -270,6 +270,9 @@ public class SystemInfoService {
}
dropBackend(backend.getIp(), backend.getHostName(), backend.getHeartbeatPort());
+ // update BeInfoCollector
+ Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
+ beinfoCollector.dropBeInfo(backendId);
}
// final entry of dropping backend
@@ -1231,6 +1234,10 @@ public class SystemInfoService {
copiedReportVersions.remove(backend.getId());
ImmutableMap newIdToReportVersion = ImmutableMap.copyOf(copiedReportVersions);
idToReportVersionRef = newIdToReportVersion;
+
+ // update BeInfoCollector
+ Backend.BeInfoCollector beinfoCollector = Backend.getBeInfoCollector();
+ beinfoCollector.dropBeInfo(backend.getId());
}
public void updateBackendState(Backend be) {
diff --git a/gensrc/thrift/MasterService.thrift b/gensrc/thrift/MasterService.thrift
index 893bb0171a..18d95b3854 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -101,6 +101,7 @@ struct TReportRequest {
8: optional i64 tablet_max_compaction_score
9: optional list storage_policy // only id and version
10: optional list resource // only id and version
+ 11: i32 num_cores
}
struct TMasterResult {
diff --git a/regression-test/data/dynamic_table_p0/sql/q01.out b/regression-test/data/dynamic_table_p0/sql/q01.out
index 9719048c79..c2daca09b1 100644
--- a/regression-test/data/dynamic_table_p0/sql/q01.out
+++ b/regression-test/data/dynamic_table_p0/sql/q01.out
@@ -15,12 +15,12 @@
[[8188473317676292]] 113
[[1246015209249282]] 137
[[2507054251761891]] 705
-[[3773869957247344]] 762
-[[8270995445718570]] 762
[[5110638976348890]] 762
-[[1010130250138698]] 769
+[[8270995445718570]] 762
+[[3773869957247344]] 762
[[7412557565511915]] 769
[[8987047093946431]] 769
+[[1010130250138698]] 769
[[4455327319328767]] 783
[[8828150595154990]] 885
[[7159775357516943]] 1104
@@ -60,14 +60,14 @@
[[4747614140915628]] 2535
[[8597511952125331]] 2535
[[5730174107868168], [5730174107868168]] 2596
-[[4354883209311598]] 2660
[[8119601212676566]] 2660
+[[4354883209311598]] 2660
[[2043666895665144]] 2700
[[3521040327201062]] 2767
[[5776942886505092]] 2787
[[359661801933760]] 2841
-[[153644039088669]] 2842
[[7004296435935771]] 2842
+[[153644039088669]] 2842
[[1627354218830689]] 2842
[[5859840954071140]] 2918
[[4386667172417724]] 2944
@@ -83,9 +83,9 @@
[[3724245364169931]] 4416
[[7916138178043215]] 4420
[[7763869997273429]] 4452
-[[2782955878363126]] 4966
-[[2354480809921026]] 4966
[[670650193123303]] 4966
+[[2354480809921026]] 4966
+[[2782955878363126]] 4966
[[5165127671898678]] 5000
[[6389401809481499]] 5355
[[7615002208576309]] 6460
@@ -97,11 +97,11 @@
[[8088693211060628]] 9441
[[1693725099154825]] 9441
[[1809442347930970]] 9550
+[[1946642491818096]] 9550
[[3825997181791941]] 9550
[[8070919951629642]] 9550
-[[1946642491818096]] 9550
-[[1421733087335887]] 9550
[[2525418264579859]] 9550
+[[1421733087335887]] 9550
[[8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849], [8344178849407849]] 9597
[[8092402219407069]] 15236
[[1472749710201141]] 29743
@@ -114,106 +114,106 @@
[[8485415091903596], [8485415091903596], [8485415091903596], [8485415091903596], [8485415091903596], [8485415091903596], [8485415091903596]] 140000
-- !q01_6 --
-153644039088669
-359661801933760
-433649683957555
-461763776919610
-670650193123303
-793107092391462
-909085960590569
-1010130250138698
1062399915275687
-1246015209249282
-1421733087335887
-1472749710201141
-1594793682472250
-1627354218830689
-1685792381137272
-1693725099154825
-1809442347930970
-1946642491818096
-2042138032090263
-2043666895665144
-2264322793739384
-2321583468933540
-2354480809921026
-2371994091383347
-2377102276803763
-2401724312046667
-2507054251761891
-2525418264579859
-2663410947740189
-2782955878363126
-2957271339174925
-3246031659541791
-3467530916498381
-3521040327201062
-3724245364169931
-3773869957247344
-3825997181791941
-4095256255478091
-4245294408202541
-4354883209311598
-4386667172417724
-4455327319328767
-4590062018629189
-4747614140915628
-4809930628427145
-4975090293678843
5110638976348890
-5131906219034408
-5165127671898678
-5220311469220731
-5343737085255499
-5588351545916968
-5679970266278027
-5730174107868168
-5776942886505092
-5790470732472115
-5814976036215560
-5827569795054362
-5857463922541047
-5859840954071140
-6313101949227572
-6337013526045930
-6389401809481499
-6403234720218018
6437188822160102
-6492680839765176
-7004296435935771
+670650193123303
+8119601212676566
+359661801933760
7010643802717876
-7119085210022543
-7159775357516943
+5730174107868168
+5679970266278027
+2663410947740189
+8344178849407849
+8088693211060628
+8440511462993326
+1809442347930970
+793107092391462
+433649683957555
+3467530916498381
+2264322793739384
+4455327319328767
+2377102276803763
+5165127671898678
+8322339034003297
+2371994091383347
+3246031659541791
+8270995445718570
+2043666895665144
+7929380011639087
+1246015209249282
+4747614140915628
+1594793682472250
+5588351545916968
+8168903104703233
+6313101949227572
+909085960590569
+2042138032090263
+4095256255478091
+8536831366526581
7170312234638999
7300691594757017
-7367822710336978
-7388338586294277
-7412557565511915
-7615002208576309
-7763869997273429
-7916138178043215
-7929380011639087
-8049598119637189
-8070919951629642
-8088693211060628
+5776942886505092
8092402219407069
-8119601212676566
-8135216530942053
-8168903104703233
-8188473317676292
-8270995445718570
-8305732448960719
-8322339034003297
-8344178849407849
-8440511462993326
-8485415091903596
-8536831366526581
-8597511952125331
-8608916026515418
+7367822710336978
+5343737085255499
+6389401809481499
+6492680839765176
+6337013526045930
+1946642491818096
+1685792381137272
+2354480809921026
+5814976036215560
+1693725099154825
+2401724312046667
+7004296435935771
+4354883209311598
8708146704871155
-8812535475818332
-8828150595154990
+7119085210022543
+8597511952125331
+7412557565511915
+4386667172417724
+2321583468933540
+8305732448960719
+3773869957247344
+4245294408202541
+1472749710201141
+2782955878363126
+461763776919610
+2957271339174925
+8188473317676292
+5220311469220731
8987047093946431
+5131906219034408
+153644039088669
+8049598119637189
+4809930628427145
+1010130250138698
+6403234720218018
+7615002208576309
+2507054251761891
+7916138178043215
+3825997181791941
+8070919951629642
+7763869997273429
+5790470732472115
+7388338586294277
+5827569795054362
+2525418264579859
+3724245364169931
+8828150595154990
+7159775357516943
+5859840954071140
+4590062018629189
+8812535475818332
+3521040327201062
+8485415091903596
+8135216530942053
+5857463922541047
+1627354218830689
+1421733087335887
+8608916026515418
+4975090293678843
-- !q01_7 --
[3521040327201062, 3521040327201062] 2767 834842
diff --git a/regression-test/suites/dynamic_table_p0/sql/q01.sql b/regression-test/suites/dynamic_table_p0/sql/q01.sql
index e9139983d3..14f730426e 100644
--- a/regression-test/suites/dynamic_table_p0/sql/q01.sql
+++ b/regression-test/suites/dynamic_table_p0/sql/q01.sql
@@ -2,6 +2,6 @@ SELECT count() FROM test_btc_json;
SELECT avg(fee) FROM test_btc_json;
SELECT avg(size(`inputs.prev_out.spending_outpoints.n`)) FROM test_btc_json;
SELECT avg(size(`inputs.prev_out.spending_outpoints.tx_index`)) FROM test_btc_json;
-select `inputs.prev_out.spending_outpoints.tx_index`, fee from test_btc_json order by fee;
-select `out.tx_index`[-1] from test_btc_json order by `out.tx_index`[-1];
+select `inputs.prev_out.spending_outpoints.tx_index`, fee from test_btc_json order by fee,hash;
+select `out.tx_index`[-1] from test_btc_json order by hash,`out.tx_index`[-1];
select `out.tx_index`, fee, `out.value`[1] from test_btc_json where array_contains(`out.value`, 2450939412);
\ No newline at end of file
diff --git a/regression-test/suites/dynamic_table_p0/sql/q04.sql b/regression-test/suites/dynamic_table_p0/sql/q04.sql
index a1c6873fd7..d42e7328f9 100644
--- a/regression-test/suites/dynamic_table_p0/sql/q04.sql
+++ b/regression-test/suites/dynamic_table_p0/sql/q04.sql
@@ -1,4 +1,4 @@
-select `answers.date`, `answers.user` from test_es_nested_json where size(`answers.date`) > 3 limit 10;
+select `answers.date`, `answers.user` from test_es_nested_json where size(`answers.date`) > 3 order by qid limit 10 ;
select `answers.date`[1], qid, title from test_es_nested_json where array_contains(`answers.user`, 'Michael Ecklund (804104)') order by qid limit 10;
select qid, title from test_es_nested_json where array_contains(`answers.date`, '2012-04-03T19:35:38.007');
select `answers.date` from test_es_nested_json where size(`answers.date`) > 0 order by `answers.date`[1] limit 10;