From 449d0c219f31cac5dbe889dece97a5a7df3c0a2e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 7 Sep 2022 10:34:28 +0800 Subject: [PATCH] [Improvement](sort) Accumulate blocks to do partial sort (#12336) --- be/src/vec/exec/vsort_node.cpp | 39 +++++++++++-------- be/src/vec/exec/vsort_node.h | 7 +++- .../window_functions/test_window_function.out | 12 +++--- .../test_window_function.groovy | 8 ++-- 4 files changed, 39 insertions(+), 27 deletions(-) diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 8749b2913e..f12a436d1d 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -22,13 +22,15 @@ #include "runtime/runtime_state.h" #include "util/debug_util.h" #include "vec/core/sort_block.h" +#include "vec/utils/util.hpp" namespace doris::vectorized { VSortNode::VSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) : ExecNode(pool, tnode, descs), _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), - _num_rows_skipped(0) {} + _num_rows_skipped(0), + _unsorted_block(nullptr) {} Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(ExecNode::init(tnode, state)); @@ -44,6 +46,8 @@ Status VSortNode::prepare(RuntimeState* state) { RETURN_IF_ERROR(ExecNode::prepare(state)); SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor)); + _unsorted_block.reset(new MutableBlock( + VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc()))); return Status::OK(); } @@ -124,15 +128,20 @@ void VSortNode::debug_string(int indentation_level, stringstream* out) const { Status VSortNode::sort_input(RuntimeState* state) { bool eos = false; do { - Block block; - RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, &eos), - child(0)->get_next_span(), eos); - auto rows = block.rows(); - - if (rows != 0) { - RETURN_IF_ERROR(pretreat_block(block)); - size_t mem_usage = block.allocated_bytes(); - + do { + Block upstream_block; + RETURN_IF_ERROR_AND_CHECK_SPAN( + child(0)->get_next_after_projects(state, &upstream_block, &eos), + child(0)->get_next_span(), eos); + if (upstream_block.rows() != 0) { + _unsorted_block->merge(upstream_block); + } + } while (!eos && _unsorted_block->rows() < BUFFERED_BLOCK_SIZE && + _unsorted_block->allocated_bytes() < BUFFERED_BLOCK_BYTES); + if (_unsorted_block->rows() > 0) { + _total_mem_usage += _unsorted_block->allocated_bytes(); + Block block = _unsorted_block->to_block(0); + RETURN_IF_ERROR(partial_sort(block)); // dispose TOP-N logic if (_limit != -1) { // Here is a little opt to reduce the mem uasge, we build a max heap @@ -140,9 +149,8 @@ Status VSortNode::sort_input(RuntimeState* state) { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_num_rows_in_block < _limit) { - _total_mem_usage += mem_usage; _sorted_blocks.emplace_back(std::move(block)); - _num_rows_in_block += rows; + _num_rows_in_block += block.rows(); _block_priority_queue.emplace(_pool->add( new SortCursorImpl(_sorted_blocks.back(), _sort_description))); } else { @@ -151,19 +159,18 @@ Status VSortNode::sort_input(RuntimeState* state) { if (!block_cursor.totally_greater(_block_priority_queue.top())) { _sorted_blocks.emplace_back(std::move(block)); _block_priority_queue.push(block_cursor); - _total_mem_usage += mem_usage; } else { continue; } } } else { // dispose normal sort logic - _total_mem_usage += mem_usage; _sorted_blocks.emplace_back(std::move(block)); } - RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input.")); + _unsorted_block.reset(new MutableBlock( + VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc()))); } } while (!eos); @@ -171,7 +178,7 @@ Status VSortNode::sort_input(RuntimeState* state) { return Status::OK(); } -Status VSortNode::pretreat_block(doris::vectorized::Block& block) { +Status VSortNode::partial_sort(doris::vectorized::Block& block) { if (_vsort_exec_exprs.need_materialize_tuple()) { auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs(); std::vector valid_column_ids(output_tuple_expr_ctxs.size()); diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h index f67326afa6..1565be5ca6 100644 --- a/be/src/vec/exec/vsort_node.h +++ b/be/src/vec/exec/vsort_node.h @@ -57,7 +57,7 @@ private: // Fetch input rows and feed them to the sorter until the input is exhausted. Status sort_input(RuntimeState* state); - Status pretreat_block(Block& block); + Status partial_sort(Block& block); void build_merge_tree(); @@ -84,6 +84,11 @@ private: // only valid in TOP-N node uint64_t _num_rows_in_block = 0; std::priority_queue _block_priority_queue; + + std::unique_ptr _unsorted_block; + + static constexpr size_t BUFFERED_BLOCK_SIZE = 1024 * 1024; + static constexpr size_t BUFFERED_BLOCK_BYTES = 16 << 20; }; } // namespace doris::vectorized diff --git a/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out b/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out index ae4252ae19..792acddd34 100644 --- a/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out +++ b/regression-test/data/query_p0/sql_functions/window_functions/test_window_function.out @@ -306,14 +306,14 @@ USA Pete Hello -- !last_value1 -- \N \N -9223372036854775807 false --9223372036854775807 false +-9223372036854775807 true -11011907 false -11011903 true 123456 true 7210457 false 11011902 false -11011902 false -11011902 false +11011902 true +11011902 true 11011903 false 11011905 false 11011920 true @@ -324,14 +324,14 @@ USA Pete Hello -- !last_value2 -- \N \N -9223372036854775807 false --9223372036854775807 false +-9223372036854775807 true -11011907 false -11011903 true 123456 true 7210457 false 11011902 false -11011902 false -11011902 false +11011902 true +11011902 true 11011903 false 11011905 false 11011920 true diff --git a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_function.groovy b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_function.groovy index 4c12b32e1d..6c8400b7ff 100644 --- a/regression-test/suites/query_p0/sql_functions/window_functions/test_window_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/window_functions/test_window_function.groovy @@ -336,12 +336,12 @@ suite("test_window_function") { order by a, wjj""" // test_query_last_value - qt_last_value1"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3}) - as wj from baseall order by ${k1}, wj""" - qt_last_value2"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3} + qt_last_value1"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2}) + as wj from baseall order by ${k1}, wj""" + qt_last_value2"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2} range between unbounded preceding and current row) as wj from baseall order by ${k1}, wj""" - qt_last_value3"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3} + qt_last_value3"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2} rows between unbounded preceding and current row) as wj from baseall order by ${k1}, wj""" qt_last_value4"""select a, max(d) as wjj from