[chore](session_variable) Add 'data_queue_max_blocks' to prevent the DataQueue from occupying too much memory. (#34017) (#34395)

This commit is contained in:
Jerry Hu
2024-05-05 21:20:33 +08:00
committed by GitHub
parent c3096cabe2
commit 7248420cfd
9 changed files with 69 additions and 1 deletions

View File

@ -119,10 +119,13 @@ Status DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
}
_cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes();
_cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0 &&
_sink_dependencies[_flag_queue_idx] != nullptr) {
_sink_dependencies[_flag_queue_idx]->set_ready();
}
auto old_value = _cur_blocks_total_nums.fetch_sub(1);
if (old_value == 1 && _source_dependency) {
set_source_block();
_sink_dependencies[_flag_queue_idx]->set_ready();
}
} else {
if (_is_finished[_flag_queue_idx]) {
@ -142,6 +145,11 @@ void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
_cur_bytes_in_queue[child_idx] += block->allocated_bytes();
_queue_blocks[child_idx].emplace_back(std::move(block));
_cur_blocks_nums_in_queue[child_idx] += 1;
if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue &&
_sink_dependencies[child_idx] != nullptr) {
_sink_dependencies[child_idx]->block();
}
_cur_blocks_total_nums++;
if (_source_dependency) {
set_source_ready();

View File

@ -70,6 +70,8 @@ public:
void set_source_ready();
void set_source_block();
void set_max_blocks_in_sub_queue(int64_t max_blocks) { _max_blocks_in_sub_queue = max_blocks; }
private:
std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
@ -93,6 +95,8 @@ private:
// only used by streaming agg source operator
bool _data_exhausted = false;
int64_t _max_blocks_in_sub_queue = 1;
//this only use to record the queue[0] for profile
int64_t _max_bytes_in_queue = 0;
int64_t _max_size_of_queue = 0;

View File

@ -111,6 +111,7 @@ Status UnionSinkLocalState::open(RuntimeState* state) {
for (size_t i = 0; i < p._child_expr.size(); i++) {
RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
}
_shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks());
return Status::OK();
}

View File

@ -542,6 +542,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
} else {
int child_count = union_node->children_count();
auto data_queue = std::make_shared<DataQueue>(child_count);
data_queue->set_max_blocks_in_sub_queue(_runtime_state->data_queue_max_blocks());
for (int child_id = 0; child_id < child_count; ++child_id) {
auto new_child_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline));
@ -564,8 +565,11 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
std::to_string(agg_node->id()) +
": group by and output is empty");
}
const int64_t data_queue_max_blocks = _runtime_state->data_queue_max_blocks();
if (agg_node->is_aggregate_evaluators_empty() && !agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
data_queue);
@ -577,6 +581,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
} else if (agg_node->is_streaming_preagg() && !agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>(
node->id(), agg_node, data_queue);
RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));

View File

@ -431,6 +431,11 @@ public:
return _query_options.__isset.skip_missing_version && _query_options.skip_missing_version;
}
int64_t data_queue_max_blocks() const {
return _query_options.__isset.data_queue_max_blocks ? _query_options.data_queue_max_blocks
: 1;
}
bool enable_page_cache() const;
int partitioned_hash_join_rows_threshold() const {

View File

@ -493,6 +493,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
public static final String ENABLE_AGG_SPILL = "enable_agg_spill";
public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
@ -1745,6 +1746,13 @@ public class SessionVariable implements Serializable, Writable {
needForward = true, fuzzy = true)
public boolean enableAggSpill = false;
@VariableMgr.VarAttr(
name = DATA_QUEUE_MAX_BLOCKS,
description = {"DataQueue 中每个子队列允许最大的 block 个数",
"Max blocks in DataQueue."},
needForward = true, fuzzy = true)
public long dataQueueMaxBlocks = 1;
// If the memory consumption of sort node exceed this limit, will trigger spill to disk;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
@ -3171,6 +3179,8 @@ public class SessionVariable implements Serializable, Writable {
tResult.setEnableSortSpill(enableSortSpill);
tResult.setEnableAggSpill(enableAggSpill);
tResult.setMinRevocableMem(minRevocableMem);
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
return tResult;
}

View File

@ -285,6 +285,9 @@ struct TQueryOptions {
104: optional i64 min_revocable_mem = 0
105: optional i64 spill_streaming_agg_mem_limit = 0;
// max rows of each sub-queue in DataQueue.
106: optional i64 data_queue_max_blocks = 0;
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false

View File

@ -0,0 +1,6 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select --
true
false
\N

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
suite("distinct_streaming_agg") {
sql """ use test_query_db; """
qt_select """
select k6 from baseall union select k6 from bigtable order by 1;
"""
}