Add downgrade scan thread num by column num (#35351)
This commit is contained in:
@ -80,6 +80,28 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu
|
||||
: state->query_parallel_instance_num());
|
||||
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
|
||||
_max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
|
||||
|
||||
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
|
||||
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
|
||||
// you can refer https://github.com/apache/doris/issues/35340 for details.
|
||||
int32_t max_column_reader_num = state->query_options().max_column_reader_num;
|
||||
if (_max_thread_num != 1 && max_column_reader_num > 0) {
|
||||
int32_t scan_column_num = _output_tuple_desc->slots().size();
|
||||
int32_t current_column_num = scan_column_num * _max_thread_num;
|
||||
if (current_column_num > max_column_reader_num) {
|
||||
int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
|
||||
new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
|
||||
if (new_max_thread_num < _max_thread_num) {
|
||||
int32_t origin_max_thread_num = _max_thread_num;
|
||||
_max_thread_num = new_max_thread_num;
|
||||
LOG(INFO) << "downgrade query:" << print_id(state->query_id())
|
||||
<< " scan's max_thread_num from " << origin_max_thread_num << " to "
|
||||
<< _max_thread_num << ",column num: " << scan_column_num
|
||||
<< ", max_column_reader_num: " << max_column_reader_num;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 1. Calculate max concurrency
|
||||
// For select * from table limit 10; should just use one thread.
|
||||
if ((_parent && _parent->should_run_serial()) ||
|
||||
|
||||
@ -555,6 +555,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String BYPASS_WORKLOAD_GROUP = "bypass_workload_group";
|
||||
|
||||
public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num";
|
||||
|
||||
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
|
||||
SKIP_DELETE_PREDICATE,
|
||||
SKIP_DELETE_BITMAP,
|
||||
@ -682,6 +684,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
"whether bypass workload group's limitation, currently only support bypass query queue"})
|
||||
public boolean bypassWorkloadGroup = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM)
|
||||
public int maxColumnReaderNum = 20000;
|
||||
|
||||
@VariableMgr.VarAttr(name = RESOURCE_VARIABLE)
|
||||
public String resourceGroup = "";
|
||||
|
||||
@ -2346,6 +2351,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return this.bypassWorkloadGroup;
|
||||
}
|
||||
|
||||
public int getMaxColumnReaderNum() {
|
||||
return this.maxColumnReaderNum;
|
||||
}
|
||||
|
||||
public String getResourceGroup() {
|
||||
return resourceGroup;
|
||||
}
|
||||
@ -3141,6 +3150,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
tResult.setScanQueueMemLimit(maxScanQueueMemByte);
|
||||
tResult.setNumScannerThreads(numScannerThreads);
|
||||
tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
|
||||
tResult.setMaxColumnReaderNum(maxColumnReaderNum);
|
||||
|
||||
// TODO chenhao, reservation will be calculated by cost
|
||||
tResult.setMinReservation(0);
|
||||
|
||||
@ -291,6 +291,8 @@ struct TQueryOptions {
|
||||
|
||||
110: optional bool enable_parquet_filter_by_min_max = true
|
||||
111: optional bool enable_orc_filter_by_min_max = true
|
||||
|
||||
112: optional i32 max_column_reader_num = 0
|
||||
|
||||
// For cloud, to control if the content would be written into file cache
|
||||
1000: optional bool disable_file_cache = false
|
||||
|
||||
Reference in New Issue
Block a user