diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 6def2cc4e5..50e672498b 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -86,6 +86,10 @@ public: Status init_mem_trackers(const TUniqueId& query_id = TUniqueId()); const TQueryOptions& query_options() const { return _query_options; } + int64_t scan_queue_mem_limit() const { + return _query_options.__isset.scan_queue_mem_limit ? _query_options.scan_queue_mem_limit + : _query_options.mem_limit / 20; + } ObjectPool* obj_pool() const { return _obj_pool.get(); } const DescriptorTbl& desc_tbl() const { return *_desc_tbl; } diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index e082507e88..532cebefd4 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -302,11 +302,11 @@ Status VScanNode::_start_scanners(const std::list& scanners) { if (_is_pipeline_scan) { _scanner_ctx = pipeline::PipScannerContext::create_shared( _state, this, _input_tuple_desc, _output_tuple_desc, scanners, limit(), - _state->query_options().mem_limit / 20, _col_distribute_ids); + _state->scan_queue_mem_limit(), _col_distribute_ids); } else { - _scanner_ctx = ScannerContext::create_shared(_state, this, _input_tuple_desc, - _output_tuple_desc, scanners, limit(), - _state->query_options().mem_limit / 20); + _scanner_ctx = + ScannerContext::create_shared(_state, this, _input_tuple_desc, _output_tuple_desc, + scanners, limit(), _state->scan_queue_mem_limit()); } RETURN_IF_ERROR(_scanner_ctx->init()); return Status::OK(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java index b853615ae7..37eac5ac30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SetVar.java @@ -160,6 +160,10 @@ public class SetVar { this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue()))); this.result = (LiteralExpr) this.value; } + if (getVariable().equalsIgnoreCase(SessionVariable.SCAN_QUEUE_MEM_LIMIT)) { + this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getValue().getStringValue()))); + this.result = (LiteralExpr) this.value; + } if (getVariable().equalsIgnoreCase("is_report_success")) { variable = SessionVariable.ENABLE_PROFILE; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 9e55696c85..ad2143b3da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -62,6 +62,7 @@ public class SessionVariable implements Serializable, Writable { public static final Logger LOG = LogManager.getLogger(SessionVariable.class); public static final String EXEC_MEM_LIMIT = "exec_mem_limit"; + public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit"; public static final String QUERY_TIMEOUT = "query_timeout"; public static final String INSERT_TIMEOUT = "insert_timeout"; public static final String ENABLE_PROFILE = "enable_profile"; @@ -336,6 +337,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) public long maxExecMemByte = 2147483648L; + @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT) + public long maxScanQueueMemByte = 2147483648L / 20; + @VariableMgr.VarAttr(name = ENABLE_SPILLING) public boolean enableSpilling = false; @@ -1012,6 +1016,10 @@ public class SessionVariable implements Serializable, Writable { return maxExecMemByte; } + public long getMaxScanQueueExecMemByte() { + return maxScanQueueMemByte; + } + public int getQueryTimeoutS() { return queryTimeoutS; } @@ -1159,6 +1167,10 @@ public class SessionVariable implements Serializable, Writable { } } + public void setMaxScanQueueMemByte(long scanQueueMemByte) { + this.maxScanQueueMemByte = Math.min(scanQueueMemByte, maxExecMemByte / 20); + } + public boolean isSqlQuoteShowCreate() { return sqlQuoteShowCreate; } @@ -1743,6 +1755,7 @@ public class SessionVariable implements Serializable, Writable { public TQueryOptions toThrift() { TQueryOptions tResult = new TQueryOptions(); tResult.setMemLimit(maxExecMemByte); + tResult.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, maxExecMemByte / 20)); // TODO chenhao, reservation will be calculated by cost tResult.setMinReservation(0); @@ -1996,6 +2009,7 @@ public class SessionVariable implements Serializable, Writable { public TQueryOptions getQueryOptionVariables() { TQueryOptions queryOptions = new TQueryOptions(); queryOptions.setMemLimit(maxExecMemByte); + queryOptions.setScanQueueMemLimit(Math.min(maxScanQueueMemByte, maxExecMemByte / 20)); queryOptions.setQueryTimeout(queryTimeoutS); queryOptions.setInsertTimeout(insertTimeoutS); return queryOptions; diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 64ddd524bf..3b77b325b9 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -219,6 +219,8 @@ struct TQueryOptions { 71: optional bool enable_parquet_lazy_mat = true 72: optional bool enable_orc_lazy_mat = true + + 73: optional i64 scan_queue_mem_limit }