From c4e469c82c061d2501c9ba5bffc3387eb25467f2 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Thu, 20 Apr 2023 18:59:08 +0800 Subject: [PATCH] [feature](agg) Support spill to disk in aggregation (#18051) --- be/src/runtime/block_spill_manager.cpp | 2 +- be/src/runtime/runtime_state.h | 6 + .../aggregate_function_java_udaf.h | 1 + .../vec/common/hash_table/fixed_hash_table.h | 2 +- .../vec/common/hash_table/string_hash_table.h | 11 + be/src/vec/core/block_spill_reader.cpp | 39 +- be/src/vec/core/block_spill_reader.h | 6 + be/src/vec/core/block_spill_writer.cpp | 43 +- be/src/vec/core/block_spill_writer.h | 1 + be/src/vec/core/sort_cursor.h | 7 +- be/src/vec/exec/vaggregation_node.cpp | 410 +++++++++++++++- be/src/vec/exec/vaggregation_node.h | 446 +++++++----------- .../org/apache/doris/qe/SessionVariable.java | 43 ++ gensrc/thrift/PaloInternalService.thrift | 5 + 14 files changed, 683 insertions(+), 339 deletions(-) diff --git a/be/src/runtime/block_spill_manager.cpp b/be/src/runtime/block_spill_manager.cpp index b45e4c3ad6..ad0d711c80 100644 --- a/be/src/runtime/block_spill_manager.cpp +++ b/be/src/runtime/block_spill_manager.cpp @@ -128,7 +128,7 @@ Status BlockSpillManager::get_reader(int64_t stream_id, vectorized::BlockSpillRe std::string path; { std::lock_guard l(lock_); - DCHECK(id_to_file_paths_.end() != id_to_file_paths_.find(stream_id)); + CHECK(id_to_file_paths_.end() != id_to_file_paths_.find(stream_id)); path = id_to_file_paths_[stream_id]; } reader.reset(new vectorized::BlockSpillReader(stream_id, path, profile, delete_after_read)); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 105acf5918..bb29113529 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -379,6 +379,12 @@ public: void set_be_exec_version(int32_t version) noexcept { _query_options.be_exec_version = version; } + int64_t external_agg_bytes_threshold() const { + return _query_options.__isset.external_agg_bytes_threshold + ? _query_options.external_agg_bytes_threshold + : 0; + } + private: Status create_error_log_file(); diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index 1295e54fb8..ec2992b268 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -444,6 +444,7 @@ public: if (place == _exec_place) { this->data(_exec_place).destroy(); this->data(_exec_place).~Data(); + _first_created = true; } } diff --git a/be/src/vec/common/hash_table/fixed_hash_table.h b/be/src/vec/common/hash_table/fixed_hash_table.h index f47de257db..962beada45 100644 --- a/be/src/vec/common/hash_table/fixed_hash_table.h +++ b/be/src/vec/common/hash_table/fixed_hash_table.h @@ -201,7 +201,7 @@ public: free(); std::swap(buf, rhs.buf); - this->setSize(rhs.size()); + this->set_size(rhs.size()); Allocator::operator=(std::move(rhs)); Cell::State::operator=(std::move(rhs)); diff --git a/be/src/vec/common/hash_table/string_hash_table.h b/be/src/vec/common/hash_table/string_hash_table.h index 5bb871f98b..712ccb3810 100644 --- a/be/src/vec/common/hash_table/string_hash_table.h +++ b/be/src/vec/common/hash_table/string_hash_table.h @@ -476,8 +476,19 @@ public: m3(std::move(rhs.m3)), ms(std::move(rhs.ms)) {} + StringHashTable& operator=(StringHashTable&& other) { + std::swap(m0, other.m0); + std::swap(m1, other.m1); + std::swap(m2, other.m2); + std::swap(m3, other.m3); + std::swap(ms, other.ms); + return *this; + } + ~StringHashTable() = default; + size_t hash(doris::StringRef key) { return StringHashTableHash()(key); } + // Dispatch is written in a way that maximizes the performance: // 1. Always memcpy 8 times bytes // 2. Use switch case extension to generate fast dispatching table diff --git a/be/src/vec/core/block_spill_reader.cpp b/be/src/vec/core/block_spill_reader.cpp index a1f6e29abe..629ebc7487 100644 --- a/be/src/vec/core/block_spill_reader.cpp +++ b/be/src/vec/core/block_spill_reader.cpp @@ -41,6 +41,8 @@ namespace vectorized { void BlockSpillReader::_init_profile() { read_time_ = ADD_TIMER(profile_, "ReadTime"); deserialize_time_ = ADD_TIMER(profile_, "DeserializeTime"); + read_bytes_ = ADD_COUNTER(profile_, "ReadBytes", TUnit::BYTES); + read_block_num_ = ADD_COUNTER(profile_, "ReadBlockNum", TUnit::UNIT); } Status BlockSpillReader::open() { @@ -53,9 +55,6 @@ Status BlockSpillReader::open() { RETURN_IF_ERROR(FileFactory::create_file_reader(nullptr, system_properties, file_description, &file_system, &file_reader_)); - if (delete_after_read_) { - unlink(file_path_.c_str()); - } size_t file_size = file_reader_->size(); @@ -89,6 +88,12 @@ Status BlockSpillReader::open() { return Status::OK(); } +void BlockSpillReader::seek(size_t block_index) { + DCHECK(file_reader_ != nullptr); + DCHECK_LT(block_index, block_count_); + read_block_index_ = block_index; +} + // The returned block is owned by BlockSpillReader and is // destroyed when reading next block. Status BlockSpillReader::read(Block* block, bool* eos) { @@ -102,6 +107,12 @@ Status BlockSpillReader::read(Block* block, bool* eos) { size_t bytes_to_read = block_start_offsets_[read_block_index_ + 1] - block_start_offsets_[read_block_index_]; + + if (bytes_to_read == 0) { + ++read_block_index_; + COUNTER_UPDATE(read_block_num_, 1); + return Status::OK(); + } Slice result(read_buff_.get(), bytes_to_read); size_t bytes_read = 0; @@ -112,17 +123,23 @@ Status BlockSpillReader::read(Block* block, bool* eos) { &bytes_read)); } DCHECK(bytes_read == bytes_to_read); + COUNTER_UPDATE(read_bytes_, bytes_read); + COUNTER_UPDATE(read_block_num_, 1); - PBlock pb_block; - BlockUPtr new_block = nullptr; - { - SCOPED_TIMER(deserialize_time_); - if (!pb_block.ParseFromArray(result.data, result.size)) { - return Status::InternalError("Failed to read spilled block"); + if (bytes_read > 0) { + PBlock pb_block; + BlockUPtr new_block = nullptr; + { + SCOPED_TIMER(deserialize_time_); + if (!pb_block.ParseFromArray(result.data, result.size)) { + return Status::InternalError("Failed to read spilled block"); + } + new_block.reset(new Block(pb_block)); } - new_block.reset(new Block(pb_block)); + block->swap(*new_block); + } else { + block->clear_column_data(); } - block->swap(*new_block); ++read_block_index_; diff --git a/be/src/vec/core/block_spill_reader.h b/be/src/vec/core/block_spill_reader.h index a82024ca14..c87c13b56c 100644 --- a/be/src/vec/core/block_spill_reader.h +++ b/be/src/vec/core/block_spill_reader.h @@ -52,10 +52,14 @@ public: Status read(Block* block, bool* eos); + void seek(size_t block_index); + int64_t get_id() const { return stream_id_; } std::string get_path() const { return file_path_; } + size_t block_count() const { return block_count_; } + private: void _init_profile(); @@ -73,6 +77,8 @@ private: RuntimeProfile* profile_ = nullptr; RuntimeProfile::Counter* read_time_; RuntimeProfile::Counter* deserialize_time_; + RuntimeProfile::Counter* read_bytes_; + RuntimeProfile::Counter* read_block_num_; }; using BlockSpillReaderUPtr = std::unique_ptr; diff --git a/be/src/vec/core/block_spill_writer.cpp b/be/src/vec/core/block_spill_writer.cpp index 8694be20d3..86a59e9357 100644 --- a/be/src/vec/core/block_spill_writer.cpp +++ b/be/src/vec/core/block_spill_writer.cpp @@ -38,6 +38,7 @@ void BlockSpillWriter::_init_profile() { write_bytes_counter_ = ADD_COUNTER(profile_, "WriteBytes", TUnit::BYTES); write_timer_ = ADD_TIMER(profile_, "WriteTime"); serialize_timer_ = ADD_TIMER(profile_, "SerializeTime"); + write_blocks_num_ = ADD_COUNTER(profile_, "WriteBlockNum", TUnit::UNIT); } Status BlockSpillWriter::open() { @@ -77,10 +78,6 @@ Status BlockSpillWriter::close() { Status BlockSpillWriter::write(const Block& block) { auto rows = block.rows(); - if (0 == rows) { - return Status::OK(); - } - // file format: block1, block2, ..., blockn, meta if (rows <= batch_size_) { return _write_internal(block); @@ -119,33 +116,37 @@ Status BlockSpillWriter::_write_internal(const Block& block) { Status status; std::string buff; - PBlock pblock; - { - SCOPED_TIMER(serialize_timer_); - status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock, - &uncompressed_bytes, &compressed_bytes, - segment_v2::CompressionTypePB::LZ4); + if (block.rows() > 0) { + PBlock pblock; + { + SCOPED_TIMER(serialize_timer_); + status = block.serialize(BeExecVersionManager::get_newest_version(), &pblock, + &uncompressed_bytes, &compressed_bytes, + segment_v2::CompressionTypePB::LZ4); + if (!status.ok()) { + unlink(file_path_.c_str()); + return status; + } + pblock.SerializeToString(&buff); + } + + { + SCOPED_TIMER(write_timer_); + status = file_writer_->append(buff); + written_bytes = buff.size(); + } + if (!status.ok()) { unlink(file_path_.c_str()); return status; } - pblock.SerializeToString(&buff); - } - - { - SCOPED_TIMER(write_timer_); - status = file_writer_->append(buff); - written_bytes = buff.size(); - } - if (!status.ok()) { - unlink(file_path_.c_str()); - return status; } max_sub_block_size_ = std::max(max_sub_block_size_, written_bytes); meta_.append((const char*)&total_written_bytes_, sizeof(size_t)); COUNTER_UPDATE(write_bytes_counter_, written_bytes); + COUNTER_UPDATE(write_blocks_num_, 1); total_written_bytes_ += written_bytes; ++written_blocks_; diff --git a/be/src/vec/core/block_spill_writer.h b/be/src/vec/core/block_spill_writer.h index d3cf0b8479..1d41888e77 100644 --- a/be/src/vec/core/block_spill_writer.h +++ b/be/src/vec/core/block_spill_writer.h @@ -83,6 +83,7 @@ private: RuntimeProfile::Counter* write_bytes_counter_; RuntimeProfile::Counter* serialize_timer_; RuntimeProfile::Counter* write_timer_; + RuntimeProfile::Counter* write_blocks_num_; }; using BlockSpillWriterUPtr = std::unique_ptr; diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index aced2c022d..8c6accaba8 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -229,10 +229,13 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { bool has_next_block() override { _block.clear(); - auto status = _block_supplier(&_block, &_is_eof); + Status status; + do { + status = _block_supplier(&_block, &_is_eof); + } while (_block.empty() && !_is_eof && status.ok()); // If status not ok, upper callers could not detect whether it is eof or error. // So that fatal here, and should throw exception in the future. - if (status.ok() && !_is_eof) { + if (status.ok() && !_block.empty()) { if (_ordering_expr.size() > 0) { for (int i = 0; status.ok() && i < desc.size(); ++i) { // TODO yiguolei: throw exception if status not ok in the future diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 76bad7dc43..56ec4a585f 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -27,6 +27,7 @@ #include #include "exec/exec_node.h" +#include "runtime/block_spill_manager.h" #include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/memory/mem_tracker.h" @@ -162,10 +163,19 @@ Status AggregationNode::init(const TPlanNode& tnode, RuntimeState* state) { _aggregate_evaluators.push_back(evaluator); } - _partitioned_hash_table_enabled = state->partitioned_hash_agg_rows_threshold() > 0; - _agg_data->set_enable_partitioned_hash_table(_partitioned_hash_table_enabled); - const auto& agg_functions = tnode.agg_node.aggregate_functions; + _external_agg_bytes_threshold = state->external_agg_bytes_threshold(); + + if (_external_agg_bytes_threshold > 0) { + size_t spill_partition_count_bits = 4; + if (state->query_options().__isset.external_agg_partition_bits) { + spill_partition_count_bits = state->query_options().external_agg_partition_bits; + } + + _spill_partition_helper = + std::make_unique(spill_partition_count_bits); + } + _is_merge = std::any_of(agg_functions.cbegin(), agg_functions.cend(), [](const auto& e) { return e.nodes[0].agg_expr.is_merge_agg; }); return Status::OK(); @@ -343,9 +353,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); RETURN_IF_ERROR(VExpr::prepare(_probe_expr_ctxs, state, child(0)->row_desc())); - runtime_profile()->add_info_string("PartitionedHashTableEnabled", - _partitioned_hash_table_enabled ? "true" : "false"); - _agg_profile_arena = std::make_unique(); int j = _probe_expr_ctxs.size(); @@ -436,10 +443,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { ((_total_size_of_aggregate_states + _align_aggregate_states - 1) / _align_aggregate_states) * _align_aggregate_states)); - if constexpr (HashTableTraits::is_partitioned_table) { - agg_method.data.set_partitioned_threshold( - state->partitioned_hash_agg_rows_threshold()); - } }, _agg_data->_aggregated_method_variant); if (_is_merge) { @@ -590,9 +593,16 @@ Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block* bloc Status AggregationNode::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { if (in_block->rows() > 0) { RETURN_IF_ERROR(_executor.execute(in_block)); + RETURN_IF_ERROR(_try_spill_disk()); _executor.update_memusage(); } - if (eos) _can_read = true; + if (eos) { + if (_spill_context.has_data) { + _try_spill_disk(true); + RETURN_IF_ERROR(_spill_context.prepare_for_reading()); + } + _can_read = true; + } return Status::OK(); } @@ -605,11 +615,6 @@ void AggregationNode::release_resource(RuntimeState* state) { if (_hash_table_size_counter) { std::visit( [&](auto&& agg_method) { - using HashTableType = std::decay_t; - if constexpr (HashTableTraits::is_partitioned_table) { - COUNTER_UPDATE(_build_table_convert_timer, - agg_method.data.get_convert_timer_value()); - } COUNTER_SET(_hash_table_size_counter, int64_t(agg_method.data.size())); }, _agg_data->_aggregated_method_variant); @@ -870,6 +875,62 @@ bool AggregationNode::_should_expand_preagg_hash_tables() { _agg_data->_aggregated_method_variant); } +size_t AggregationNode::_memory_usage() const { + size_t usage = 0; + std::visit( + [&](auto&& agg_method) { + using HashMethodType = std::decay_t; + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + HashMethodType>::value) { + usage += agg_method.keys_memory_usage; + } + usage += agg_method.data.get_buffer_size_in_bytes(); + }, + _agg_data->_aggregated_method_variant); + + if (_agg_arena_pool) { + usage += _agg_arena_pool->size(); + } + + if (_aggregate_data_container) { + usage += _aggregate_data_container->memory_usage(); + } + + return usage; +} + +Status AggregationNode::_reset_hash_table() { + return std::visit( + [&](auto&& agg_method) { + auto& hash_table = agg_method.data; + using HashMethodType = std::decay_t; + using HashTableType = std::decay_t; + + if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits< + HashMethodType>::value) { + agg_method.reset(); + } + + hash_table.for_each_mapped([&](auto& mapped) { + if (mapped) { + _destroy_agg_status(mapped); + mapped = nullptr; + } + }); + + _aggregate_data_container.reset(new AggregateDataContainer( + sizeof(typename HashTableType::key_type), + ((_total_size_of_aggregate_states + _align_aggregate_states - 1) / + _align_aggregate_states) * + _align_aggregate_states)); + HashTableType new_hash_table; + hash_table = std::move(new_hash_table); + _agg_arena_pool.reset(new Arena); + return Status::OK(); + }, + _agg_data->_aggregated_method_variant); +} + size_t AggregationNode::_get_hash_table_size() { return std::visit([&](auto&& agg_method) { return agg_method.data.size(); }, _agg_data->_aggregated_method_variant); @@ -1047,8 +1108,13 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i RETURN_IF_ERROR(std::visit( [&](auto&& agg_method) -> Status { if (auto& hash_tbl = agg_method.data; hash_tbl.add_elem_size_overflow(rows)) { + /// If too much memory is used during the pre-aggregation stage, + /// it is better to output the data directly without performing further aggregation. + const bool used_too_much_memory = + (_external_agg_bytes_threshold > 0 && + _memory_usage() > _external_agg_bytes_threshold); // do not try to do agg, just init and serialize directly return the out_block - if (!_should_expand_preagg_hash_tables()) { + if (!_should_expand_preagg_hash_tables() || used_too_much_memory) { SCOPED_TIMER(_streaming_agg_timer); ret_flag = true; @@ -1150,6 +1216,206 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i return Status::OK(); } +template +Status AggregationNode::_serialize_hash_table_to_block(HashTableCtxType& context, + HashTableType& hash_table, Block& block, + std::vector& keys_) { + int key_size = _probe_expr_ctxs.size(); + int agg_size = _aggregate_evaluators.size(); + + MutableColumns value_columns(agg_size); + DataTypes value_data_types(agg_size); + MutableColumns key_columns; + + for (int i = 0; i < key_size; ++i) { + key_columns.emplace_back(_probe_expr_ctxs[i]->root()->data_type()->create_column()); + } + + if (_use_fixed_length_serialization_opt) { + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + value_data_types[i] = _aggregate_evaluators[i]->function()->get_serialized_type(); + value_columns[i] = _aggregate_evaluators[i]->function()->create_serialize_column(); + } + } else { + auto serialize_string_type = std::make_shared(); + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + value_data_types[i] = serialize_string_type; + value_columns[i] = serialize_string_type->create_column(); + } + } + + context.init_once(); + const auto size = hash_table.size(); + std::vector keys(size); + if (_values.size() < size) { + _values.resize(size); + } + + size_t num_rows = 0; + _aggregate_data_container->init_once(); + auto& iter = _aggregate_data_container->iterator; + + { + while (iter != _aggregate_data_container->end()) { + keys[num_rows] = iter.get_key(); + _values[num_rows] = iter.get_aggregate_data(); + ++iter; + ++num_rows; + } + } + + { context.insert_keys_into_columns(keys, key_columns, num_rows, _probe_key_sz); } + + if (hash_table.has_null_key_data()) { + // only one key of group by support wrap null key + // here need additional processing logic on the null key / value + CHECK(key_columns.size() == 1); + CHECK(key_columns[0]->is_nullable()); + key_columns[0]->insert_data(nullptr, 0); + + // Here is no need to set `keys[num_rows]`, keep it as default value. + _values[num_rows] = hash_table.get_null_key_data(); + ++num_rows; + } + + if (_use_fixed_length_serialization_opt) { + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->serialize_to_column( + _values, _offsets_of_aggregate_states[i], value_columns[i], num_rows); + } + } else { + std::vector value_buffer_writers; + for (int i = 0; i < _aggregate_evaluators.size(); ++i) { + value_buffer_writers.emplace_back( + *reinterpret_cast(value_columns[i].get())); + } + for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { + _aggregate_evaluators[i]->function()->serialize_vec( + _values, _offsets_of_aggregate_states[i], value_buffer_writers[i], num_rows); + } + } + + ColumnsWithTypeAndName columns_with_schema; + for (int i = 0; i < key_size; ++i) { + columns_with_schema.emplace_back(std::move(key_columns[i]), + _probe_expr_ctxs[i]->root()->data_type(), + _probe_expr_ctxs[i]->root()->expr_name()); + } + for (int i = 0; i < agg_size; ++i) { + columns_with_schema.emplace_back(std::move(value_columns[i]), value_data_types[i], + _aggregate_evaluators[i]->function()->get_name()); + } + + block = columns_with_schema; + keys_.swap(keys); + return Status::OK(); +} + +template +Status AggregationNode::_spill_hash_table(HashTableCtxType& agg_method, HashTableType& hash_table) { + Block block; + std::vector keys; + RETURN_IF_ERROR(_serialize_hash_table_to_block(agg_method, hash_table, block, keys)); + CHECK_EQ(block.rows(), hash_table.size()); + CHECK_EQ(keys.size(), block.rows()); + + if (!_spill_context.has_data) { + _spill_context.has_data = true; + _spill_context.runtime_profile = _runtime_profile->create_child("Spill", true, true); + } + + BlockSpillWriterUPtr writer; + RETURN_IF_ERROR(ExecEnv::GetInstance()->block_spill_mgr()->get_writer( + std::numeric_limits::max(), writer, _spill_context.runtime_profile)); + Defer defer {[&]() { + // redundant call is ok + writer->close(); + }}; + _spill_context.stream_ids.emplace_back(writer->get_id()); + + std::vector partitioned_indices(block.rows()); + std::vector blocks_rows(_spill_partition_helper->partition_count); + + // The last row may contain a null key. + const size_t rows = hash_table.has_null_key_data() ? block.rows() - 1 : block.rows(); + for (size_t i = 0; i < rows; ++i) { + const auto index = _spill_partition_helper->get_index(hash_table.hash(keys[i])); + partitioned_indices[i] = index; + blocks_rows[index]++; + } + + if (hash_table.has_null_key_data()) { + // Here put the row with null key at the last partition. + const auto index = _spill_partition_helper->partition_count - 1; + partitioned_indices[rows] = index; + blocks_rows[index]++; + } + + for (size_t i = 0; i < _spill_partition_helper->partition_count; ++i) { + Block block_to_write = block.clone_empty(); + if (blocks_rows[i] == 0) { + /// Here write one empty block to ensure there are enough blocks in the file, + /// blocks' count should be equal with partition_count. + writer->write(block_to_write); + continue; + } + + MutableBlock mutable_block(std::move(block_to_write)); + + for (auto& column : mutable_block.mutable_columns()) { + column->reserve(blocks_rows[i]); + } + + size_t begin = 0; + size_t length = 0; + for (size_t j = 0; j < partitioned_indices.size(); ++j) { + if (partitioned_indices[j] != i) { + if (length > 0) { + mutable_block.add_rows(&block, begin, length); + } + length = 0; + continue; + } + + if (length == 0) { + begin = j; + } + length++; + } + + if (length > 0) { + mutable_block.add_rows(&block, begin, length); + } + + CHECK_EQ(mutable_block.rows(), blocks_rows[i]); + RETURN_IF_ERROR(writer->write(mutable_block.to_block())); + } + RETURN_IF_ERROR(writer->close()); + + return Status::OK(); +} + +Status AggregationNode::_try_spill_disk(bool eos) { + if (_external_agg_bytes_threshold == 0) { + return Status::OK(); + } + return std::visit( + [&](auto&& agg_method) -> Status { + auto& hash_table = agg_method.data; + if (!eos && _memory_usage() < _external_agg_bytes_threshold) { + return Status::OK(); + } + + if (_get_hash_table_size() == 0) { + return Status::OK(); + } + + RETURN_IF_ERROR(_spill_hash_table(agg_method, hash_table)); + return _reset_hash_table(); + }, + _agg_data->_aggregated_method_variant); +} + Status AggregationNode::_execute_with_serialized_key(Block* block) { if (_reach_limit) { return _execute_with_serialized_key_helper(block); @@ -1158,25 +1424,76 @@ Status AggregationNode::_execute_with_serialized_key(Block* block) { } } +Status AggregationNode::_merge_spilt_data() { + CHECK(!_spill_context.stream_ids.empty()); + + for (auto& reader : _spill_context.readers) { + CHECK_LT(_spill_context.read_cursor, reader->block_count()); + reader->seek(_spill_context.read_cursor); + Block block; + bool eos; + RETURN_IF_ERROR(reader->read(&block, &eos)); + + if (!block.empty()) { + auto st = _merge_with_serialized_key_helper( + &block); + RETURN_IF_ERROR(st); + } + } + _spill_context.read_cursor++; + return Status::OK(); +} + +Status AggregationNode::_get_result_with_spilt_data(RuntimeState* state, Block* block, bool* eos) { + CHECK(!_spill_context.stream_ids.empty()); + CHECK(_spill_partition_helper != nullptr) << "_spill_partition_helper should not be null"; + _aggregate_data_container->init_once(); + while (_aggregate_data_container->iterator == _aggregate_data_container->end()) { + if (_spill_context.read_cursor == _spill_partition_helper->partition_count) { + break; + } + RETURN_IF_ERROR(_reset_hash_table()); + RETURN_IF_ERROR(_merge_spilt_data()); + _aggregate_data_container->init_once(); + } + + RETURN_IF_ERROR(_get_result_with_serialized_key_non_spill(state, block, eos)); + if (*eos) { + *eos = _spill_context.read_cursor == _spill_partition_helper->partition_count; + } + CHECK(!block->empty() || *eos); + return Status::OK(); +} + Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos) { + if (_spill_context.has_data) { + return _get_result_with_spilt_data(state, block, eos); + } else { + return _get_result_with_serialized_key_non_spill(state, block, eos); + } +} + +Status AggregationNode::_get_result_with_serialized_key_non_spill(RuntimeState* state, Block* block, + bool* eos) { // non-nullable column(id in `_make_nullable_keys`) will be converted to nullable. bool mem_reuse = _make_nullable_keys.empty() && block->mem_reuse(); - auto column_withschema = VectorizedUtils::create_columns_with_type_and_name(_row_descriptor); + + auto columns_with_schema = VectorizedUtils::create_columns_with_type_and_name(_row_descriptor); int key_size = _probe_expr_ctxs.size(); MutableColumns key_columns; for (int i = 0; i < key_size; ++i) { if (!mem_reuse) { - key_columns.emplace_back(column_withschema[i].type->create_column()); + key_columns.emplace_back(columns_with_schema[i].type->create_column()); } else { key_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); } } MutableColumns value_columns; - for (int i = key_size; i < column_withschema.size(); ++i) { + for (int i = key_size; i < columns_with_schema.size(); ++i) { if (!mem_reuse) { - value_columns.emplace_back(column_withschema[i].type->create_column()); + value_columns.emplace_back(columns_with_schema[i].type->create_column()); } else { value_columns.emplace_back(std::move(*block->get_by_position(i).column).mutate()); } @@ -1243,7 +1560,7 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo _agg_data->_aggregated_method_variant); if (!mem_reuse) { - *block = column_withschema; + *block = columns_with_schema; MutableColumns columns(block->columns()); for (int i = 0; i < block->columns(); ++i) { if (i < key_size) { @@ -1260,6 +1577,37 @@ Status AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos) { + if (_spill_context.has_data) { + return _serialize_with_serialized_key_result_with_spilt_data(state, block, eos); + } else { + return _serialize_with_serialized_key_result_non_spill(state, block, eos); + } +} + +Status AggregationNode::_serialize_with_serialized_key_result_with_spilt_data(RuntimeState* state, + Block* block, + bool* eos) { + CHECK(!_spill_context.stream_ids.empty()); + CHECK(_spill_partition_helper != nullptr) << "_spill_partition_helper should not be null"; + _aggregate_data_container->init_once(); + while (_aggregate_data_container->iterator == _aggregate_data_container->end()) { + if (_spill_context.read_cursor == _spill_partition_helper->partition_count) { + break; + } + RETURN_IF_ERROR(_reset_hash_table()); + RETURN_IF_ERROR(_merge_spilt_data()); + _aggregate_data_container->init_once(); + } + + RETURN_IF_ERROR(_serialize_with_serialized_key_result_non_spill(state, block, eos)); + if (*eos) { + *eos = _spill_context.read_cursor == _spill_partition_helper->partition_count; + } + CHECK(!block->empty() || *eos); + return Status::OK(); +} +Status AggregationNode::_serialize_with_serialized_key_result_non_spill(RuntimeState* state, + Block* block, bool* eos) { SCOPED_TIMER(_serialize_result_timer); int key_size = _probe_expr_ctxs.size(); int agg_size = _aggregate_evaluators.size(); @@ -1387,9 +1735,9 @@ Status AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat Status AggregationNode::_merge_with_serialized_key(Block* block) { if (_reach_limit) { - return _merge_with_serialized_key_helper(block); + return _merge_with_serialized_key_helper(block); } else { - return _merge_with_serialized_key_helper(block); + return _merge_with_serialized_key_helper(block); } } @@ -1452,4 +1800,18 @@ void AggregationNode::_release_mem() { _values.swap(tmp_values); } +Status AggSpillContext::prepare_for_reading() { + if (readers_prepared) { + return Status::OK(); + } + readers_prepared = true; + + readers.resize(stream_ids.size()); + auto* manager = ExecEnv::GetInstance()->block_spill_mgr(); + for (size_t i = 0; i != stream_ids.size(); ++i) { + RETURN_IF_ERROR(manager->get_reader(stream_ids[i], readers[i], runtime_profile, true)); + } + return Status::OK(); +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index aa9b149ef2..951152a045 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -59,6 +59,8 @@ #include "vec/common/string_ref.h" #include "vec/common/uint128.h" #include "vec/core/block.h" +#include "vec/core/block_spill_reader.h" +#include "vec/core/block_spill_writer.h" #include "vec/core/column_with_type_and_name.h" #include "vec/core/types.h" #include "vec/exprs/vectorized_agg_fn.h" @@ -172,6 +174,12 @@ struct AggregationMethodSerialized { } } + void reset() { + _arena.reset(); + keys_memory_usage = 0; + _serialized_key_buffer_size = 0; + } + private: size_t _serialized_key_buffer_size; uint8_t* _serialized_key_buffer; @@ -182,8 +190,6 @@ private: using AggregatedDataWithoutKey = AggregateDataPtr; using AggregatedDataWithStringKey = PHHashMap>; -using PartitionedAggregatedDataWithStringKey = - PHPartitionedHashMap>; using AggregatedDataWithShortStringKey = StringHashMap; template @@ -443,23 +449,6 @@ using AggregatedDataWithUInt128KeyPhase2 = using AggregatedDataWithUInt256KeyPhase2 = PHHashMap>; -using PartitionedAggregatedDataWithUInt32Key = - PHPartitionedHashMap>; -using PartitionedAggregatedDataWithUInt64Key = - PHPartitionedHashMap>; -using PartitionedAggregatedDataWithUInt128Key = - PHPartitionedHashMap>; -using PartitionedAggregatedDataWithUInt256Key = - PHPartitionedHashMap>; -using PartitionedAggregatedDataWithUInt32KeyPhase2 = - PHPartitionedHashMap>; -using PartitionedAggregatedDataWithUInt64KeyPhase2 = - PHPartitionedHashMap>; -using PartitionedAggregatedDataWithUInt128KeyPhase2 = - PHPartitionedHashMap>; -using PartitionedAggregatedDataWithUInt256KeyPhase2 = - PHPartitionedHashMap>; - using AggregatedDataWithNullableUInt8Key = AggregationDataWithNullKey; using AggregatedDataWithNullableUInt16Key = AggregationDataWithNullKey; using AggregatedDataWithNullableUInt32Key = AggregationDataWithNullKey; @@ -475,19 +464,6 @@ using AggregatedDataWithNullableUInt128Key = using AggregatedDataWithNullableUInt128KeyPhase2 = AggregationDataWithNullKey; -using PartitionedAggregatedDataWithNullableUInt32Key = - AggregationDataWithNullKey; -using PartitionedAggregatedDataWithNullableUInt64Key = - AggregationDataWithNullKey; -using PartitionedAggregatedDataWithNullableUInt32KeyPhase2 = - AggregationDataWithNullKey; -using PartitionedAggregatedDataWithNullableUInt64KeyPhase2 = - AggregationDataWithNullKey; -using PartitionedAggregatedDataWithNullableUInt128Key = - AggregationDataWithNullKey; -using PartitionedAggregatedDataWithNullableUInt128KeyPhase2 = - AggregationDataWithNullKey; - using AggregatedMethodVariants = std::variant< AggregationMethodSerialized, AggregationMethodOneNumber, @@ -528,38 +504,7 @@ using AggregatedMethodVariants = std::variant< AggregationMethodKeysFixed, AggregationMethodKeysFixed, AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodSerialized, - AggregationMethodOneNumber, - AggregationMethodOneNumber, - AggregationMethodOneNumber, - AggregationMethodOneNumber, - AggregationMethodOneNumber, - AggregationMethodOneNumber, - AggregationMethodSingleNullableColumn< - AggregationMethodOneNumber>, - AggregationMethodSingleNullableColumn< - AggregationMethodOneNumber>, - AggregationMethodSingleNullableColumn>, - AggregationMethodSingleNullableColumn>, - AggregationMethodSingleNullableColumn>, - AggregationMethodSingleNullableColumn>, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed, - AggregationMethodKeysFixed>; + AggregationMethodKeysFixed>; struct AggregatedDataVariants { AggregatedDataVariants() = default; @@ -567,7 +512,6 @@ struct AggregatedDataVariants { AggregatedDataVariants& operator=(const AggregatedDataVariants&) = delete; AggregatedDataWithoutKey without_key = nullptr; AggregatedMethodVariants _aggregated_method_variant; - bool _enable_partitioned_hash_table = false; // TODO: may we should support uint256 in the future enum class Type { @@ -593,23 +537,14 @@ struct AggregatedDataVariants { Type _type = Type::EMPTY; - void set_enable_partitioned_hash_table(bool enabled) { - _enable_partitioned_hash_table = enabled; - } - void init(Type type, bool is_nullable = false) { _type = type; switch (_type) { case Type::without_key: break; case Type::serialized: - if (_enable_partitioned_hash_table) { - _aggregated_method_variant.emplace< - AggregationMethodSerialized>(); - } else { - _aggregated_method_variant - .emplace>(); - } + _aggregated_method_variant + .emplace>(); break; case Type::int8_key: if (is_nullable) { @@ -630,246 +565,123 @@ struct AggregatedDataVariants { } break; case Type::int32_key: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace>(); - } + if (is_nullable) { + _aggregated_method_variant.emplace>>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodOneNumber>(); - } + _aggregated_method_variant + .emplace>(); } break; case Type::int32_key_phase2: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace>(); - } + if (is_nullable) { + _aggregated_method_variant + .emplace>>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace>(); - } + _aggregated_method_variant.emplace< + AggregationMethodOneNumber>(); } break; case Type::int64_key: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace>(); - } + if (is_nullable) { + _aggregated_method_variant.emplace>>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodOneNumber>(); - } + _aggregated_method_variant + .emplace>(); } break; case Type::int64_key_phase2: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace>(); - } + if (is_nullable) { + _aggregated_method_variant + .emplace>>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace>(); - } + _aggregated_method_variant.emplace< + AggregationMethodOneNumber>(); } break; case Type::int128_key: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace>(); - } + if (is_nullable) { + _aggregated_method_variant + .emplace>>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodOneNumber>(); - } + _aggregated_method_variant.emplace< + AggregationMethodOneNumber>(); } break; case Type::int128_key_phase2: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace>(); - } + if (is_nullable) { + _aggregated_method_variant + .emplace>>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodSingleNullableColumn>>(); - } else { - _aggregated_method_variant.emplace>(); - } + _aggregated_method_variant.emplace< + AggregationMethodOneNumber>(); } break; case Type::int64_keys: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace>(); - } else { - _aggregated_method_variant.emplace>(); - } + if (is_nullable) { + _aggregated_method_variant + .emplace>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } + _aggregated_method_variant + .emplace>(); } break; case Type::int64_keys_phase2: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace>(); - } else { - _aggregated_method_variant.emplace>(); - } + + if (is_nullable) { + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed>(); } + break; case Type::int128_keys: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace>(); - } else { - _aggregated_method_variant.emplace>(); - } + + if (is_nullable) { + _aggregated_method_variant + .emplace>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } + _aggregated_method_variant + .emplace>(); } break; case Type::int128_keys_phase2: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace>(); - } else { - _aggregated_method_variant.emplace>(); - } + + if (is_nullable) { + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace>(); - } + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed>(); } + break; case Type::int256_keys: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace>(); - } else { - _aggregated_method_variant.emplace>(); - } + + if (is_nullable) { + _aggregated_method_variant + .emplace>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } + _aggregated_method_variant + .emplace>(); } + break; case Type::int256_keys_phase2: - if (_enable_partitioned_hash_table) { - if (is_nullable) { - _aggregated_method_variant.emplace>(); - } else { - _aggregated_method_variant.emplace>(); - } + + if (is_nullable) { + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed>(); } else { - if (is_nullable) { - _aggregated_method_variant.emplace< - AggregationMethodKeysFixed>(); - } else { - _aggregated_method_variant.emplace>(); - } + _aggregated_method_variant.emplace< + AggregationMethodKeysFixed>(); } break; case Type::string_key: @@ -1012,6 +824,44 @@ private: bool _inited = false; }; +struct AggSpillContext { + bool has_data = false; + bool readers_prepared = false; + + /// stream ids of writers/readers + std::vector stream_ids; + std::vector readers; + RuntimeProfile* runtime_profile; + + size_t read_cursor {}; + + Status prepare_for_reading(); + + ~AggSpillContext() { + for (auto& reader : readers) { + if (reader) { + reader->close(); + reader.reset(); + } + } + } +}; + +struct SpillPartitionHelper { + const size_t partition_count_bits; + const size_t partition_count; + const size_t max_partition_index; + + SpillPartitionHelper(const size_t partition_count_bits_) + : partition_count_bits(partition_count_bits_), + partition_count(1 << partition_count_bits), + max_partition_index(partition_count - 1) {} + + size_t get_index(size_t hash_value) const { + return (hash_value >> (32 - partition_count_bits)) & max_partition_index; + } +}; + // not support spill class AggregationNode final : public ::doris::ExecNode { public: @@ -1057,7 +907,6 @@ private: bool _is_merge; bool _is_first_phase; bool _use_fixed_length_serialization_opt; - bool _partitioned_hash_table_enabled; std::unique_ptr _agg_profile_arena; size_t _align_aggregate_states = 1; @@ -1066,8 +915,14 @@ private: /// The total size of the row from the aggregate functions. size_t _total_size_of_aggregate_states = 0; + size_t _external_agg_bytes_threshold; + size_t _partitioned_threshold = 0; + AggregatedDataVariantsUPtr _agg_data; + AggSpillContext _spill_context; + std::unique_ptr _spill_partition_helper; + ArenaUPtr _agg_arena_pool; RuntimeProfile::Counter* _build_timer; @@ -1127,7 +982,18 @@ private: void _close_without_key(); Status _get_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos); + Status _get_result_with_serialized_key_non_spill(RuntimeState* state, Block* block, bool* eos); + + Status _merge_spilt_data(); + + Status _get_result_with_spilt_data(RuntimeState* state, Block* block, bool* eos); + Status _serialize_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos); + Status _serialize_with_serialized_key_result_non_spill(RuntimeState* state, Block* block, + bool* eos); + Status _serialize_with_serialized_key_result_with_spilt_data(RuntimeState* state, Block* block, + bool* eos); + Status _pre_agg_with_serialized_key(Block* in_block, Block* out_block); Status _execute_with_serialized_key(Block* block); Status _merge_with_serialized_key(Block* block); @@ -1209,7 +1075,7 @@ private: return ((VSlotRef*)ctxs[0]->root())->column_id(); } - template + template Status _merge_with_serialized_key_helper(Block* block) { SCOPED_TIMER(_merge_timer); @@ -1217,10 +1083,14 @@ private: ColumnRawPtrs key_columns(key_size); for (size_t i = 0; i < key_size; ++i) { - int result_column_id = -1; - RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block, &result_column_id)); - block->replace_by_position_if_const(result_column_id); - key_columns[i] = block->get_by_position(result_column_id).column.get(); + if constexpr (for_spill) { + key_columns[i] = block->get_by_position(i).column.get(); + } else { + int result_column_id = -1; + RETURN_IF_ERROR(_probe_expr_ctxs[i]->execute(block, &result_column_id)); + block->replace_by_position_if_const(result_column_id); + key_columns[i] = block->get_by_position(result_column_id).column.get(); + } } int rows = block->rows(); @@ -1272,8 +1142,13 @@ private: _emplace_into_hash_table(_places.data(), key_columns, rows); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { - if (_aggregate_evaluators[i]->is_merge()) { - int col_id = _get_slot_column_id(_aggregate_evaluators[i]); + if (_aggregate_evaluators[i]->is_merge() || for_spill) { + int col_id; + if constexpr (for_spill) { + col_id = _probe_expr_ctxs.size() + i; + } else { + col_id = _get_slot_column_id(_aggregate_evaluators[i]); + } auto column = block->get_by_position(col_id).column; if (column->is_nullable()) { column = ((ColumnNullable*)column.get())->get_nested_column_ptr(); @@ -1320,6 +1195,19 @@ private: void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, const size_t num_rows); + size_t _memory_usage() const; + + Status _reset_hash_table(); + + Status _try_spill_disk(bool eos = false); + + template + Status _serialize_hash_table_to_block(HashTableCtxType& context, HashTableType& hash_table, + Block& block, std::vector& keys); + + template + Status _spill_hash_table(HashTableCtxType& agg_method, HashTableType& hash_table); + void _find_in_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, size_t num_rows); void release_tracker(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index c7d4880553..717969b225 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -277,6 +277,8 @@ public class SessionVariable implements Serializable, Writable { public static final String GROUP_CONCAT_MAX_LEN = "group_concat_max_len"; public static final String EXTERNAL_SORT_BYTES_THRESHOLD = "external_sort_bytes_threshold"; + public static final String EXTERNAL_AGG_BYTES_THRESHOLD = "external_agg_bytes_threshold"; + public static final String EXTERNAL_AGG_PARTITION_BITS = "external_agg_partition_bits"; public static final String ENABLE_TWO_PHASE_READ_OPT = "enable_two_phase_read_opt"; public static final String TOPN_OPT_LIMIT_THRESHOLD = "topn_opt_limit_threshold"; @@ -761,6 +763,18 @@ public class SessionVariable implements Serializable, Writable { checker = "checkExternalSortBytesThreshold", fuzzy = true) public long externalSortBytesThreshold = 0; + // Set to 0 to disable; min: 128M + public static final long MIN_EXTERNAL_AGG_BYTES_THRESHOLD = 134217728; + @VariableMgr.VarAttr(name = EXTERNAL_AGG_BYTES_THRESHOLD, + checker = "checkExternalAggBytesThreshold", fuzzy = true) + public long externalAggBytesThreshold = 0; + + public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4; + public static final int MAX_EXTERNAL_AGG_PARTITION_BITS = 8; + @VariableMgr.VarAttr(name = EXTERNAL_AGG_PARTITION_BITS, + checker = "checkExternalAggPartitionBits", fuzzy = true) + public int externalAggPartitionBits = 8; // means that the hash table will be partitioned into 256 blocks. + // Whether enable two phase read optimization // 1. read related rowids along with necessary column data // 2. spawn fetch RPC to other nodes to get related data by sorted rowids @@ -826,15 +840,22 @@ public class SessionVariable implements Serializable, Writable { switch (randomInt) { case 0: this.externalSortBytesThreshold = 0; + this.externalAggBytesThreshold = 0; break; case 1: this.externalSortBytesThreshold = 1; + this.externalAggBytesThreshold = 1; + this.externalAggPartitionBits = 6; break; case 2: this.externalSortBytesThreshold = 1024 * 1024; + this.externalAggBytesThreshold = 1024 * 1024; + this.externalAggPartitionBits = 8; break; default: this.externalSortBytesThreshold = 100 * 1024 * 1024 * 1024; + this.externalAggBytesThreshold = 100 * 1024 * 1024 * 1024; + this.externalAggPartitionBits = 4; break; } // pull_request_id default value is 0 @@ -1601,6 +1622,24 @@ public class SessionVariable implements Serializable, Writable { } } + public void checkExternalAggBytesThreshold(String externalAggBytesThreshold) { + long value = Long.valueOf(externalAggBytesThreshold); + if (value > 0 && value < MIN_EXTERNAL_AGG_BYTES_THRESHOLD) { + LOG.warn("external agg bytes threshold: {}, min: {}", value, MIN_EXTERNAL_AGG_BYTES_THRESHOLD); + throw new UnsupportedOperationException("minimum value is " + MIN_EXTERNAL_AGG_BYTES_THRESHOLD); + } + } + + public void checkExternalAggPartitionBits(String externalAggPartitionBits) { + int value = Integer.valueOf(externalAggPartitionBits); + if (value < MIN_EXTERNAL_AGG_PARTITION_BITS || value > MAX_EXTERNAL_AGG_PARTITION_BITS) { + LOG.warn("external agg bytes threshold: {}, min: {}, max: {}", + value, MIN_EXTERNAL_AGG_PARTITION_BITS, MAX_EXTERNAL_AGG_PARTITION_BITS); + throw new UnsupportedOperationException("min value is " + MIN_EXTERNAL_AGG_PARTITION_BITS + " max value is " + + MAX_EXTERNAL_AGG_PARTITION_BITS); + } + } + public boolean isEnableFileCache() { return enableFileCache; } @@ -1690,6 +1729,10 @@ public class SessionVariable implements Serializable, Writable { tResult.setExternalSortBytesThreshold(externalSortBytesThreshold); + tResult.setExternalAggBytesThreshold(externalAggBytesThreshold); + + tResult.setExternalAggPartitionBits(externalAggPartitionBits); + tResult.setEnableFileCache(enableFileCache); if (dryRunQuery) { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 02949dfa94..111718eb1b 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -192,6 +192,7 @@ struct TQueryOptions { 59: optional i64 external_sort_bytes_threshold = 0 + // deprecated 60: optional i32 partitioned_hash_agg_rows_threshold = 0 61: optional bool enable_file_cache = true @@ -207,6 +208,10 @@ struct TQueryOptions { 66: optional i32 parallel_instance = 1 // Indicate where useServerPrepStmts enabled 67: optional bool mysql_row_binary_format = false; + 68: optional i64 external_agg_bytes_threshold = 0 + + // partition count(1 << external_agg_partition_bits) when spill aggregation data into disk + 69: optional i32 external_agg_partition_bits = 4 }