From 509689491f0e9d3258131f6bc29fa86b375b9e9d Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Sat, 27 May 2023 22:42:10 +0800 Subject: [PATCH] [improvement](exec) Refactor the partition sort node to send data in pipeline mode (#20128) before: the node will wait to retrieve all data from child, then send data to parent. now: for data from child that does not require sorting, it can be sent to parent immediately. --- be/src/vec/exec/vpartition_sort_node.cpp | 44 ++++++++++++++++++------ be/src/vec/exec/vpartition_sort_node.h | 3 ++ 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/be/src/vec/exec/vpartition_sort_node.cpp b/be/src/vec/exec/vpartition_sort_node.cpp index 8f3f50b9d4..cb3b199285 100644 --- a/be/src/vec/exec/vpartition_sort_node.cpp +++ b/be/src/vec/exec/vpartition_sort_node.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -180,9 +181,11 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl _value_places[0]->append_whole_block(input_block, child(0)->row_desc()); } else { //just simply use partition num to check - //TODO: here could set can read to true directly. need mutex if (_num_partition > 512 && child_input_rows < 10000 * _num_partition) { - _blocks_buffer.push(std::move(*input_block)); + { + std::lock_guard lock(_buffer_mutex); + _blocks_buffer.push(std::move(*input_block)); + } } else { RETURN_IF_ERROR(_split_block_by_partition(input_block, state->batch_size())); RETURN_IF_CANCELLED(state); @@ -219,6 +222,7 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl debug_profile(); } COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition)); + //so all data from child have sink completed _can_read = true; } return Status::OK(); @@ -257,17 +261,32 @@ Status VPartitionSortNode::alloc_resource(RuntimeState* state) { return Status::OK(); } +bool VPartitionSortNode::can_read() { + std::lock_guard lock(_buffer_mutex); + return !_blocks_buffer.empty() || _can_read; +} + Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) { RETURN_IF_CANCELLED(state); output_block->clear_column_data(); - bool current_eos = false; - RETURN_IF_ERROR(get_sorted_block(state, output_block, ¤t_eos)); - if (_sort_idx >= _partition_sorts.size() && output_block->rows() == 0) { + { + std::lock_guard lock(_buffer_mutex); if (_blocks_buffer.empty() == false) { _blocks_buffer.front().swap(*output_block); _blocks_buffer.pop(); - } else { + return Status::OK(); + } + } + + if (_can_read) { + bool current_eos = false; + RETURN_IF_ERROR(get_sorted_block(state, output_block, ¤t_eos)); + } + { + std::lock_guard lock(_buffer_mutex); + if (_blocks_buffer.empty() && _sort_idx >= _partition_sorts.size()) { + _can_read = false; *eos = true; } } @@ -309,6 +328,9 @@ Status VPartitionSortNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } + if (state->enable_profile()) { + debug_profile(); + } return ExecNode::close(state); } @@ -435,20 +457,22 @@ void VPartitionSortNode::debug_profile() { fmt::format_to(partition_blocks_read, "["); for (auto place : _value_places) { fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows()); - fmt::format_to(partition_rows_read, "{}, ", place->blocks.size()); + fmt::format_to(partition_blocks_read, "{}, ", place->blocks.size()); } fmt::format_to(partition_rows_read, "]"); fmt::format_to(partition_blocks_read, "]"); - runtime_profile()->add_info_string("PerPartitionBlocksRead", partition_blocks_read.data()); - runtime_profile()->add_info_string("PerPartitionRowsRead", partition_rows_read.data()); + runtime_profile()->add_info_string("PerPartitionBlocksRead", + fmt::to_string(partition_blocks_read)); + runtime_profile()->add_info_string("PerPartitionRowsRead", fmt::to_string(partition_rows_read)); fmt::memory_buffer partition_output_rows; fmt::format_to(partition_output_rows, "["); for (auto row : partition_profile_output_rows) { fmt::format_to(partition_output_rows, "{}, ", row); } fmt::format_to(partition_output_rows, "]"); - runtime_profile()->add_info_string("PerPartitionOutputRows", partition_output_rows.data()); + runtime_profile()->add_info_string("PerPartitionOutputRows", + fmt::to_string(partition_output_rows)); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/vpartition_sort_node.h b/be/src/vec/exec/vpartition_sort_node.h index 4143b19dc9..0b24ce8378 100644 --- a/be/src/vec/exec/vpartition_sort_node.h +++ b/be/src/vec/exec/vpartition_sort_node.h @@ -21,6 +21,7 @@ #include #include +#include #include "exec/exec_node.h" #include "vec/columns/column.h" @@ -329,6 +330,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override; void debug_profile(); + bool can_read(); private: template @@ -370,6 +372,7 @@ private: std::unique_ptr _previous_row = nullptr; std::queue _blocks_buffer; int64_t child_input_rows = 0; + std::mutex _buffer_mutex; RuntimeProfile::Counter* _build_timer; RuntimeProfile::Counter* _emplace_key_timer;