[improvement](exec) Refactor the partition sort node to send data in pipeline mode (#20128)
before: the node will wait to retrieve all data from child, then send data to parent. now: for data from child that does not require sorting, it can be sent to parent immediately.
This commit is contained in:
@ -22,6 +22,7 @@
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
@ -180,9 +181,11 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl
|
||||
_value_places[0]->append_whole_block(input_block, child(0)->row_desc());
|
||||
} else {
|
||||
//just simply use partition num to check
|
||||
//TODO: here could set can read to true directly. need mutex
|
||||
if (_num_partition > 512 && child_input_rows < 10000 * _num_partition) {
|
||||
_blocks_buffer.push(std::move(*input_block));
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_buffer_mutex);
|
||||
_blocks_buffer.push(std::move(*input_block));
|
||||
}
|
||||
} else {
|
||||
RETURN_IF_ERROR(_split_block_by_partition(input_block, state->batch_size()));
|
||||
RETURN_IF_CANCELLED(state);
|
||||
@ -219,6 +222,7 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl
|
||||
debug_profile();
|
||||
}
|
||||
COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition));
|
||||
//so all data from child have sink completed
|
||||
_can_read = true;
|
||||
}
|
||||
return Status::OK();
|
||||
@ -257,17 +261,32 @@ Status VPartitionSortNode::alloc_resource(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
bool VPartitionSortNode::can_read() {
|
||||
std::lock_guard<std::mutex> lock(_buffer_mutex);
|
||||
return !_blocks_buffer.empty() || _can_read;
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block,
|
||||
bool* eos) {
|
||||
RETURN_IF_CANCELLED(state);
|
||||
output_block->clear_column_data();
|
||||
bool current_eos = false;
|
||||
RETURN_IF_ERROR(get_sorted_block(state, output_block, ¤t_eos));
|
||||
if (_sort_idx >= _partition_sorts.size() && output_block->rows() == 0) {
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_buffer_mutex);
|
||||
if (_blocks_buffer.empty() == false) {
|
||||
_blocks_buffer.front().swap(*output_block);
|
||||
_blocks_buffer.pop();
|
||||
} else {
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
||||
if (_can_read) {
|
||||
bool current_eos = false;
|
||||
RETURN_IF_ERROR(get_sorted_block(state, output_block, ¤t_eos));
|
||||
}
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_buffer_mutex);
|
||||
if (_blocks_buffer.empty() && _sort_idx >= _partition_sorts.size()) {
|
||||
_can_read = false;
|
||||
*eos = true;
|
||||
}
|
||||
}
|
||||
@ -309,6 +328,9 @@ Status VPartitionSortNode::close(RuntimeState* state) {
|
||||
if (is_closed()) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (state->enable_profile()) {
|
||||
debug_profile();
|
||||
}
|
||||
return ExecNode::close(state);
|
||||
}
|
||||
|
||||
@ -435,20 +457,22 @@ void VPartitionSortNode::debug_profile() {
|
||||
fmt::format_to(partition_blocks_read, "[");
|
||||
for (auto place : _value_places) {
|
||||
fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows());
|
||||
fmt::format_to(partition_rows_read, "{}, ", place->blocks.size());
|
||||
fmt::format_to(partition_blocks_read, "{}, ", place->blocks.size());
|
||||
}
|
||||
fmt::format_to(partition_rows_read, "]");
|
||||
fmt::format_to(partition_blocks_read, "]");
|
||||
|
||||
runtime_profile()->add_info_string("PerPartitionBlocksRead", partition_blocks_read.data());
|
||||
runtime_profile()->add_info_string("PerPartitionRowsRead", partition_rows_read.data());
|
||||
runtime_profile()->add_info_string("PerPartitionBlocksRead",
|
||||
fmt::to_string(partition_blocks_read));
|
||||
runtime_profile()->add_info_string("PerPartitionRowsRead", fmt::to_string(partition_rows_read));
|
||||
fmt::memory_buffer partition_output_rows;
|
||||
fmt::format_to(partition_output_rows, "[");
|
||||
for (auto row : partition_profile_output_rows) {
|
||||
fmt::format_to(partition_output_rows, "{}, ", row);
|
||||
}
|
||||
fmt::format_to(partition_output_rows, "]");
|
||||
runtime_profile()->add_info_string("PerPartitionOutputRows", partition_output_rows.data());
|
||||
runtime_profile()->add_info_string("PerPartitionOutputRows",
|
||||
fmt::to_string(partition_output_rows));
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -21,6 +21,7 @@
|
||||
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
#include "exec/exec_node.h"
|
||||
#include "vec/columns/column.h"
|
||||
@ -329,6 +330,7 @@ public:
|
||||
Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override;
|
||||
|
||||
void debug_profile();
|
||||
bool can_read();
|
||||
|
||||
private:
|
||||
template <typename AggState, typename AggMethod>
|
||||
@ -370,6 +372,7 @@ private:
|
||||
std::unique_ptr<SortCursorCmp> _previous_row = nullptr;
|
||||
std::queue<Block> _blocks_buffer;
|
||||
int64_t child_input_rows = 0;
|
||||
std::mutex _buffer_mutex;
|
||||
|
||||
RuntimeProfile::Counter* _build_timer;
|
||||
RuntimeProfile::Counter* _emplace_key_timer;
|
||||
|
||||
Reference in New Issue
Block a user