[Improvement](sort) Accumulate blocks to do partial sort (#12336)
This commit is contained in:
@ -22,13 +22,15 @@
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "util/debug_util.h"
|
||||
#include "vec/core/sort_block.h"
|
||||
#include "vec/utils/util.hpp"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
VSortNode::VSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
|
||||
: ExecNode(pool, tnode, descs),
|
||||
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
|
||||
_num_rows_skipped(0) {}
|
||||
_num_rows_skipped(0),
|
||||
_unsorted_block(nullptr) {}
|
||||
|
||||
Status VSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
||||
@ -44,6 +46,8 @@ Status VSortNode::prepare(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::prepare(state));
|
||||
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
|
||||
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor));
|
||||
_unsorted_block.reset(new MutableBlock(
|
||||
VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc())));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -124,15 +128,20 @@ void VSortNode::debug_string(int indentation_level, stringstream* out) const {
|
||||
Status VSortNode::sort_input(RuntimeState* state) {
|
||||
bool eos = false;
|
||||
do {
|
||||
Block block;
|
||||
RETURN_IF_ERROR_AND_CHECK_SPAN(child(0)->get_next_after_projects(state, &block, &eos),
|
||||
child(0)->get_next_span(), eos);
|
||||
auto rows = block.rows();
|
||||
|
||||
if (rows != 0) {
|
||||
RETURN_IF_ERROR(pretreat_block(block));
|
||||
size_t mem_usage = block.allocated_bytes();
|
||||
|
||||
do {
|
||||
Block upstream_block;
|
||||
RETURN_IF_ERROR_AND_CHECK_SPAN(
|
||||
child(0)->get_next_after_projects(state, &upstream_block, &eos),
|
||||
child(0)->get_next_span(), eos);
|
||||
if (upstream_block.rows() != 0) {
|
||||
_unsorted_block->merge(upstream_block);
|
||||
}
|
||||
} while (!eos && _unsorted_block->rows() < BUFFERED_BLOCK_SIZE &&
|
||||
_unsorted_block->allocated_bytes() < BUFFERED_BLOCK_BYTES);
|
||||
if (_unsorted_block->rows() > 0) {
|
||||
_total_mem_usage += _unsorted_block->allocated_bytes();
|
||||
Block block = _unsorted_block->to_block(0);
|
||||
RETURN_IF_ERROR(partial_sort(block));
|
||||
// dispose TOP-N logic
|
||||
if (_limit != -1) {
|
||||
// Here is a little opt to reduce the mem uasge, we build a max heap
|
||||
@ -140,9 +149,8 @@ Status VSortNode::sort_input(RuntimeState* state) {
|
||||
// if one block totally greater the heap top of _block_priority_queue
|
||||
// we can throw the block data directly.
|
||||
if (_num_rows_in_block < _limit) {
|
||||
_total_mem_usage += mem_usage;
|
||||
_sorted_blocks.emplace_back(std::move(block));
|
||||
_num_rows_in_block += rows;
|
||||
_num_rows_in_block += block.rows();
|
||||
_block_priority_queue.emplace(_pool->add(
|
||||
new SortCursorImpl(_sorted_blocks.back(), _sort_description)));
|
||||
} else {
|
||||
@ -151,19 +159,18 @@ Status VSortNode::sort_input(RuntimeState* state) {
|
||||
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
|
||||
_sorted_blocks.emplace_back(std::move(block));
|
||||
_block_priority_queue.push(block_cursor);
|
||||
_total_mem_usage += mem_usage;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// dispose normal sort logic
|
||||
_total_mem_usage += mem_usage;
|
||||
_sorted_blocks.emplace_back(std::move(block));
|
||||
}
|
||||
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(state->check_query_state("vsort, while sorting input."));
|
||||
_unsorted_block.reset(new MutableBlock(
|
||||
VectorizedUtils::create_empty_columnswithtypename(child(0)->row_desc())));
|
||||
}
|
||||
} while (!eos);
|
||||
|
||||
@ -171,7 +178,7 @@ Status VSortNode::sort_input(RuntimeState* state) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VSortNode::pretreat_block(doris::vectorized::Block& block) {
|
||||
Status VSortNode::partial_sort(doris::vectorized::Block& block) {
|
||||
if (_vsort_exec_exprs.need_materialize_tuple()) {
|
||||
auto output_tuple_expr_ctxs = _vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
|
||||
std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
|
||||
|
||||
@ -57,7 +57,7 @@ private:
|
||||
// Fetch input rows and feed them to the sorter until the input is exhausted.
|
||||
Status sort_input(RuntimeState* state);
|
||||
|
||||
Status pretreat_block(Block& block);
|
||||
Status partial_sort(Block& block);
|
||||
|
||||
void build_merge_tree();
|
||||
|
||||
@ -84,6 +84,11 @@ private:
|
||||
// only valid in TOP-N node
|
||||
uint64_t _num_rows_in_block = 0;
|
||||
std::priority_queue<SortBlockCursor> _block_priority_queue;
|
||||
|
||||
std::unique_ptr<MutableBlock> _unsorted_block;
|
||||
|
||||
static constexpr size_t BUFFERED_BLOCK_SIZE = 1024 * 1024;
|
||||
static constexpr size_t BUFFERED_BLOCK_BYTES = 16 << 20;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
|
||||
@ -306,14 +306,14 @@ USA Pete Hello
|
||||
-- !last_value1 --
|
||||
\N \N
|
||||
-9223372036854775807 false
|
||||
-9223372036854775807 false
|
||||
-9223372036854775807 true
|
||||
-11011907 false
|
||||
-11011903 true
|
||||
123456 true
|
||||
7210457 false
|
||||
11011902 false
|
||||
11011902 false
|
||||
11011902 false
|
||||
11011902 true
|
||||
11011902 true
|
||||
11011903 false
|
||||
11011905 false
|
||||
11011920 true
|
||||
@ -324,14 +324,14 @@ USA Pete Hello
|
||||
-- !last_value2 --
|
||||
\N \N
|
||||
-9223372036854775807 false
|
||||
-9223372036854775807 false
|
||||
-9223372036854775807 true
|
||||
-11011907 false
|
||||
-11011903 true
|
||||
123456 true
|
||||
7210457 false
|
||||
11011902 false
|
||||
11011902 false
|
||||
11011902 false
|
||||
11011902 true
|
||||
11011902 true
|
||||
11011903 false
|
||||
11011905 false
|
||||
11011920 true
|
||||
|
||||
@ -336,12 +336,12 @@ suite("test_window_function") {
|
||||
order by a, wjj"""
|
||||
|
||||
// test_query_last_value
|
||||
qt_last_value1"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3})
|
||||
as wj from baseall order by ${k1}, wj"""
|
||||
qt_last_value2"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3}
|
||||
qt_last_value1"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2})
|
||||
as wj from baseall order by ${k1}, wj"""
|
||||
qt_last_value2"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2}
|
||||
range between unbounded preceding and current row)
|
||||
as wj from baseall order by ${k1}, wj"""
|
||||
qt_last_value3"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3}
|
||||
qt_last_value3"""select ${k1}, last_value(${k2}) over (partition by ${k1} order by ${k3},${k2}
|
||||
rows between unbounded preceding and current row)
|
||||
as wj from baseall order by ${k1}, wj"""
|
||||
qt_last_value4"""select a, max(d) as wjj from
|
||||
|
||||
Reference in New Issue
Block a user