From e892774c9a59b9eb4927665cc2f744a8b89932b0 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Wed, 20 Mar 2024 08:20:18 +0800 Subject: [PATCH] [improvement](agg) streaming agg should not take too much memory when spilling enabled (#32426) --- .../exec/streaming_aggregation_operator.cpp | 139 +++++++++--------- .../exec/streaming_aggregation_operator.h | 5 +- .../org/apache/doris/qe/SessionVariable.java | 10 +- gensrc/thrift/PaloInternalService.thrift | 2 + 4 files changed, 84 insertions(+), 72 deletions(-) diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index ab9a19ed6a..31ae9ba423 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -622,6 +622,10 @@ size_t StreamingAggLocalState::_memory_usage() const { usage += _aggregate_data_container->memory_usage(); } + std::visit( + [&](auto&& agg_method) { usage += agg_method.hash_table->get_buffer_size_in_bytes(); }, + _agg_data->method_variant); + return usage; } @@ -656,72 +660,68 @@ Status StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B // to avoid wasting memory. // But for fixed hash map, it never need to expand bool ret_flag = false; + const auto spill_streaming_agg_mem_limit = + _parent->cast()._spill_streaming_agg_mem_limit; + const bool used_too_much_memory = + spill_streaming_agg_mem_limit > 0 && _memory_usage() > spill_streaming_agg_mem_limit; RETURN_IF_ERROR(std::visit( [&](auto&& agg_method) -> Status { - if (auto& hash_tbl = *agg_method.hash_table; - hash_tbl.add_elem_size_overflow(rows)) { - /// If too much memory is used during the pre-aggregation stage, - /// it is better to output the data directly without performing further aggregation. - const bool used_too_much_memory = - (_parent->cast()._external_agg_bytes_threshold > - 0 && - _memory_usage() > _parent->cast() - ._external_agg_bytes_threshold); - // do not try to do agg, just init and serialize directly return the out_block - if (!_should_expand_preagg_hash_tables() || used_too_much_memory) { - SCOPED_TIMER(_streaming_agg_timer); - ret_flag = true; + auto& hash_tbl = *agg_method.hash_table; + /// If too much memory is used during the pre-aggregation stage, + /// it is better to output the data directly without performing further aggregation. + // do not try to do agg, just init and serialize directly return the out_block + if (used_too_much_memory || (hash_tbl.add_elem_size_overflow(rows) && + !_should_expand_preagg_hash_tables())) { + SCOPED_TIMER(_streaming_agg_timer); + ret_flag = true; - // will serialize value data to string column. - // non-nullable column(id in `_make_nullable_keys`) - // will be converted to nullable. - bool mem_reuse = p._make_nullable_keys.empty() && out_block->mem_reuse(); + // will serialize value data to string column. + // non-nullable column(id in `_make_nullable_keys`) + // will be converted to nullable. + bool mem_reuse = p._make_nullable_keys.empty() && out_block->mem_reuse(); - std::vector data_types; - vectorized::MutableColumns value_columns; - for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - auto data_type = - _aggregate_evaluators[i]->function()->get_serialized_type(); - if (mem_reuse) { - value_columns.emplace_back( - std::move(*out_block->get_by_position(i + key_size).column) - .mutate()); - } else { - // slot type of value it should always be string type - value_columns.emplace_back(_aggregate_evaluators[i] - ->function() - ->create_serialize_column()); - } - data_types.emplace_back(data_type); - } - - for (int i = 0; i != _aggregate_evaluators.size(); ++i) { - SCOPED_TIMER(_serialize_data_timer); - RETURN_IF_ERROR( - _aggregate_evaluators[i]->streaming_agg_serialize_to_column( - in_block, value_columns[i], rows, - _agg_arena_pool.get())); - } - - if (!mem_reuse) { - vectorized::ColumnsWithTypeAndName columns_with_schema; - for (int i = 0; i < key_size; ++i) { - columns_with_schema.emplace_back( - key_columns[i]->clone_resized(rows), - _probe_expr_ctxs[i]->root()->data_type(), - _probe_expr_ctxs[i]->root()->expr_name()); - } - for (int i = 0; i < value_columns.size(); ++i) { - columns_with_schema.emplace_back(std::move(value_columns[i]), - data_types[i], ""); - } - out_block->swap(vectorized::Block(columns_with_schema)); + std::vector data_types; + vectorized::MutableColumns value_columns; + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + auto data_type = + _aggregate_evaluators[i]->function()->get_serialized_type(); + if (mem_reuse) { + value_columns.emplace_back( + std::move(*out_block->get_by_position(i + key_size).column) + .mutate()); } else { - for (int i = 0; i < key_size; ++i) { - std::move(*out_block->get_by_position(i).column) - .mutate() - ->insert_range_from(*key_columns[i], 0, rows); - } + // slot type of value it should always be string type + value_columns.emplace_back(_aggregate_evaluators[i] + ->function() + ->create_serialize_column()); + } + data_types.emplace_back(data_type); + } + + for (int i = 0; i != _aggregate_evaluators.size(); ++i) { + SCOPED_TIMER(_serialize_data_timer); + RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize_to_column( + in_block, value_columns[i], rows, _agg_arena_pool.get())); + } + + if (!mem_reuse) { + vectorized::ColumnsWithTypeAndName columns_with_schema; + for (int i = 0; i < key_size; ++i) { + columns_with_schema.emplace_back( + key_columns[i]->clone_resized(rows), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } + for (int i = 0; i < value_columns.size(); ++i) { + columns_with_schema.emplace_back(std::move(value_columns[i]), + data_types[i], ""); + } + out_block->swap(vectorized::Block(columns_with_schema)); + } else { + for (int i = 0; i < key_size; ++i) { + std::move(*out_block->get_by_position(i).column) + .mutate() + ->insert_range_from(*key_columns[i], 0, rows); } } } @@ -1154,16 +1154,17 @@ Status StreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* state) _aggregate_evaluators.push_back(evaluator); } - const auto& agg_functions = tnode.agg_node.aggregate_functions; - _external_agg_bytes_threshold = state->external_agg_bytes_threshold(); - - if (_external_agg_bytes_threshold > 0) { - _spill_partition_count_bits = 4; - if (state->query_options().__isset.external_agg_partition_bits) { - _spill_partition_count_bits = state->query_options().external_agg_partition_bits; - } + if (state->enable_agg_spill()) { + // If spill enabled, the streaming agg should not occupy too much memory. + _spill_streaming_agg_mem_limit = + state->query_options().__isset.spill_streaming_agg_mem_limit + ? state->query_options().spill_streaming_agg_mem_limit + : 0; + } else { + _spill_streaming_agg_mem_limit = 0; } + const auto& agg_functions = tnode.agg_node.aggregate_functions; _is_merge = std::any_of(agg_functions.cbegin(), agg_functions.cend(), [](const auto& e) { return e.nodes[0].agg_expr.is_merge_agg; }); _op_name = "STREAMING_AGGREGATION_OPERATOR"; diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h b/be/src/pipeline/exec/streaming_aggregation_operator.h index f1d95cd54e..9125437f4a 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/streaming_aggregation_operator.h @@ -212,13 +212,14 @@ private: vectorized::Sizes _offsets_of_aggregate_states; /// The total size of the row from the aggregate functions. size_t _total_size_of_aggregate_states = 0; - size_t _external_agg_bytes_threshold; + + /// When spilling is enabled, the streaming agg should not occupy too much memory. + size_t _spill_streaming_agg_mem_limit; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; std::vector _aggregate_evaluators; bool _can_short_circuit = false; std::vector _make_nullable_keys; - size_t _spill_partition_count_bits; bool _have_conjuncts; }; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 705e1ea656..165a05fe85 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -481,6 +481,7 @@ public class SessionVariable implements Serializable, Writable { public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold"; public static final String EXTERNAL_AGG_BYTES_THRESHOLD = "external_agg_bytes_threshold"; public static final String EXTERNAL_AGG_PARTITION_BITS = "external_agg_partition_bits"; + public static final String SPILL_STREAMING_AGG_MEM_LIMIT = "spill_streaming_agg_mem_limit"; public static final String MIN_REVOCABLE_MEM = "min_revocable_mem"; public static final String ENABLE_JOIN_SPILL = "enable_join_spill"; public static final String ENABLE_SORT_SPILL = "enable_sort_spill"; @@ -1688,9 +1689,14 @@ public class SessionVariable implements Serializable, Writable { // Set to 0 to disable; min: 128M public static final long MIN_EXTERNAL_AGG_BYTES_THRESHOLD = 134217728; @VariableMgr.VarAttr(name = EXTERNAL_AGG_BYTES_THRESHOLD, - checker = "checkExternalAggBytesThreshold", fuzzy = true) + checker = "checkExternalAggBytesThreshold", fuzzy = true, varType = VariableAnnotation.DEPRECATED) public long externalAggBytesThreshold = 0; + // The memory limit of streaming agg when spilling is enabled + // NOTE: streaming agg operator will not spill to disk. + @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT) + public long spillStreamingAggMemLimit = 268435456; //256MB + public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4; public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 20; @VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS, @@ -2988,6 +2994,8 @@ public class SessionVariable implements Serializable, Writable { tResult.setExternalAggBytesThreshold(0); // disable for now + tResult.setSpillStreamingAggMemLimit(spillStreamingAggMemLimit); + tResult.setExternalAggPartitionBits(externalAggPartitionBits); tResult.setEnableFileCache(enableFileCache); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index d87a0b13e5..153eaeb1fa 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -283,6 +283,8 @@ struct TQueryOptions { 103: optional bool enable_agg_spill = false 104: optional i64 min_revocable_mem = 0 + + 105: optional i64 spill_streaming_agg_mem_limit = 0; // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false