1. Delete the code of Sort Node we do not use now. (#3666)
Optimize the quick sort by find_the_median and try to reduce recursion level of quick sort.
This commit is contained in:
@ -49,7 +49,6 @@ set(EXEC_FILES
|
||||
text_converter.cpp
|
||||
topn_node.cpp
|
||||
sort_exec_exprs.cpp
|
||||
sort_node.cpp
|
||||
olap_rewrite_node.cpp
|
||||
olap_scan_node.cpp
|
||||
olap_scanner.cpp
|
||||
|
||||
@ -45,7 +45,6 @@
|
||||
#include "exec/repeat_node.h"
|
||||
#include "exec/schema_scan_node.h"
|
||||
#include "exec/select_node.h"
|
||||
#include "exec/sort_node.h"
|
||||
#include "exec/spill_sort_node.h"
|
||||
#include "exec/topn_node.h"
|
||||
#include "exec/union_node.h"
|
||||
|
||||
@ -1,156 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "exec/sort_node.h"
|
||||
#include "exec/sort_exec_exprs.h"
|
||||
#include "runtime/row_batch.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "util/runtime_profile.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
SortNode::SortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
|
||||
: ExecNode(pool, tnode, descs),
|
||||
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
|
||||
_num_rows_skipped(0) {
|
||||
Status status = init(tnode, nullptr);
|
||||
DCHECK(status.ok()) << "SortNode c'tor:init failed: \n" << status.get_error_msg();
|
||||
}
|
||||
|
||||
SortNode::~SortNode() {
|
||||
}
|
||||
|
||||
Status SortNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
const vector<TExpr>* sort_tuple_slot_exprs = tnode.sort_node.__isset.sort_tuple_slot_exprs ?
|
||||
&tnode.sort_node.sort_tuple_slot_exprs : NULL;
|
||||
RETURN_IF_ERROR(_sort_exec_exprs.init(tnode.sort_node.ordering_exprs,
|
||||
sort_tuple_slot_exprs, _pool));
|
||||
_is_asc_order = tnode.sort_node.is_asc_order;
|
||||
_nulls_first = tnode.sort_node.nulls_first;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SortNode::prepare(RuntimeState* state) {
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_ERROR(ExecNode::prepare(state));
|
||||
RETURN_IF_ERROR(_sort_exec_exprs.prepare(
|
||||
state, child(0)->row_desc(), _row_descriptor, expr_mem_tracker()));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SortNode::open(RuntimeState* state) {
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
RETURN_IF_ERROR(_sort_exec_exprs.open(state));
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(child(0)->open(state));
|
||||
|
||||
TupleRowComparator less_than(
|
||||
_sort_exec_exprs.lhs_ordering_expr_ctxs(), _sort_exec_exprs.rhs_ordering_expr_ctxs(),
|
||||
_is_asc_order, _nulls_first);
|
||||
_sorter.reset(new MergeSorter(
|
||||
less_than, _sort_exec_exprs.sort_tuple_slot_expr_ctxs(),
|
||||
&_row_descriptor, runtime_profile(), state));
|
||||
|
||||
// The child has been opened and the sorter created. Sort the input.
|
||||
// The final merge is done on-demand as rows are requested in GetNext().
|
||||
RETURN_IF_ERROR(sort_input(state));
|
||||
|
||||
// The child can be closed at this point.
|
||||
child(0)->close(state);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
|
||||
RETURN_IF_CANCELLED(state);
|
||||
//RETURN_IF_ERROR(QueryMaintenance(state));
|
||||
|
||||
if (reached_limit()) {
|
||||
*eos = true;
|
||||
return Status::OK();
|
||||
} else {
|
||||
*eos = false;
|
||||
}
|
||||
|
||||
DCHECK_EQ(row_batch->num_rows(), 0);
|
||||
RETURN_IF_ERROR(_sorter->get_next(row_batch, eos));
|
||||
while ((_num_rows_skipped < _offset)) {
|
||||
_num_rows_skipped += row_batch->num_rows();
|
||||
// Throw away rows in the output batch until the offset is skipped.
|
||||
int rows_to_keep = _num_rows_skipped - _offset;
|
||||
if (rows_to_keep > 0) {
|
||||
row_batch->copy_rows(0, row_batch->num_rows() - rows_to_keep, rows_to_keep);
|
||||
row_batch->set_num_rows(rows_to_keep);
|
||||
} else {
|
||||
row_batch->set_num_rows(0);
|
||||
}
|
||||
if (rows_to_keep > 0 || *eos) {
|
||||
break;
|
||||
}
|
||||
RETURN_IF_ERROR(_sorter->get_next(row_batch, eos));
|
||||
}
|
||||
|
||||
_num_rows_returned += row_batch->num_rows();
|
||||
if (reached_limit()) {
|
||||
row_batch->set_num_rows(row_batch->num_rows() - (_num_rows_returned - _limit));
|
||||
*eos = true;
|
||||
}
|
||||
|
||||
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SortNode::close(RuntimeState* state) {
|
||||
if (is_closed()) {
|
||||
return Status::OK();
|
||||
}
|
||||
_sort_exec_exprs.close(state);
|
||||
_sorter.reset();
|
||||
return ExecNode::close(state);
|
||||
}
|
||||
|
||||
void SortNode::debug_string(int indentation_level, stringstream* out) const {
|
||||
*out << string(indentation_level * 2, ' ');
|
||||
*out << "SortNode(";
|
||||
// << Expr::debug_string(_sort_exec_exprs.lhs_ordering_expr_ctxs());
|
||||
for (int i = 0; i < _is_asc_order.size(); ++i) {
|
||||
*out << (i > 0 ? " " : "")
|
||||
<< (_is_asc_order[i] ? "asc" : "desc")
|
||||
<< " nulls " << (_nulls_first[i] ? "first" : "last");
|
||||
}
|
||||
ExecNode::debug_string(indentation_level, out);
|
||||
*out << ")";
|
||||
}
|
||||
|
||||
Status SortNode::sort_input(RuntimeState* state) {
|
||||
RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker());
|
||||
bool eos = false;
|
||||
do {
|
||||
batch.reset();
|
||||
RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos));
|
||||
RETURN_IF_ERROR(_sorter->add_batch(&batch));
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_LIMIT_EXCEEDED(state, "Sort, while getting next from the child.");
|
||||
// RETURN_IF_ERROR(QueryMaintenance(state));
|
||||
} while (!eos);
|
||||
RETURN_IF_ERROR(_sorter->input_done());
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,76 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#ifndef INF_DORIS_QE_SRC_BE_EXEC_SORT_NODE_H
|
||||
#define INF_DORIS_QE_SRC_BE_EXEC_SORT_NODE_H
|
||||
|
||||
#include "exec/exec_node.h"
|
||||
#include "exec/sort_exec_exprs.h"
|
||||
#include "runtime/merge_sorter.h"
|
||||
#include "runtime/buffered_block_mgr.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
// Node that implements a full sort of its input with a fixed memory budget, spilling
|
||||
// to disk if the input is larger than available memory.
|
||||
// Uses Sorter and BufferedBlockMgr for the external sort implementation.
|
||||
// Input rows to SortNode are materialized by the Sorter into a single tuple
|
||||
// using the expressions specified in sort_exec_exprs_.
|
||||
// In GetNext(), SortNode passes in the output batch to the sorter instance created
|
||||
// in Open() to fill it with sorted rows.
|
||||
// If a merge phase was performed in the sort, sorted rows are deep copied into
|
||||
// the output batch. Otherwise, the sorter instance owns the sorted data.
|
||||
class SortNode : public ExecNode {
|
||||
public:
|
||||
SortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~SortNode();
|
||||
|
||||
virtual Status prepare(RuntimeState* state);
|
||||
virtual Status open(RuntimeState* state);
|
||||
virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos);
|
||||
virtual Status close(RuntimeState* state);
|
||||
|
||||
protected:
|
||||
virtual void debug_string(int indentation_level, std::stringstream* out) const;
|
||||
|
||||
private:
|
||||
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr);
|
||||
// Fetch input rows and feed them to the sorter until the input is exhausted.
|
||||
Status sort_input(RuntimeState* state);
|
||||
|
||||
// Create a block manager object and set it in block_mgr_.
|
||||
// Returns and sets the query status to Status::MemoryLimitExceeded("Memory limit exceeded") if there is not
|
||||
// enough memory for the sort.
|
||||
Status create_block_mgr(RuntimeState* state);
|
||||
|
||||
// Number of rows to skip.
|
||||
int64_t _offset;
|
||||
int64_t _num_rows_skipped;
|
||||
|
||||
// Object used for external sorting.
|
||||
boost::scoped_ptr<MergeSorter> _sorter;
|
||||
|
||||
// Expressions and parameters used for tuple materialization and tuple comparison.
|
||||
SortExecExprs _sort_exec_exprs;
|
||||
std::vector<bool> _is_asc_order;
|
||||
std::vector<bool> _nulls_first;
|
||||
boost::scoped_ptr<MemPool> _tuple_pool;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
@ -364,11 +364,16 @@ private:
|
||||
void insertion_sort(const TupleIterator& first, const TupleIterator& last);
|
||||
|
||||
// Partitions the sequence of tuples in the range [first, last) in a run into two
|
||||
// groups around the pivot tuple - i.e. tuples in first group are <= the pivot, and
|
||||
// tuples in the second group are >= pivot. Tuples are swapped in place to create the
|
||||
// groups around the mid._current_tuple - i.e. tuples in first group are <= the mid._current_tuple
|
||||
// and tuples in the second group are >= mid._current_tuple. Tuples are swapped in place to create the
|
||||
// groups and the index to the first element in the second group is returned.
|
||||
// Checks _state->is_cancelled() and returns early with an invalid result if true.
|
||||
TupleIterator partition(TupleIterator first, TupleIterator last, Tuple* pivot);
|
||||
TupleIterator partition(TupleIterator first, TupleIterator last, TupleIterator& mid);
|
||||
|
||||
// Select the median of three iterator tuples. taking the median tends to help us select better
|
||||
// pivots that more evenly split the input range. This method makes selection of
|
||||
// bad pivots very infrequent.
|
||||
void find_the_median(TupleIterator& first, TupleIterator& last, TupleIterator& mid);
|
||||
|
||||
// Performs a quicksort of rows in the range [first, last) followed by insertion sort
|
||||
// for smaller groups of elements.
|
||||
@ -931,12 +936,34 @@ void SpillSorter::TupleSorter::insertion_sort(const TupleIterator& first,
|
||||
}
|
||||
}
|
||||
|
||||
SpillSorter::TupleSorter::TupleIterator SpillSorter::TupleSorter::partition(
|
||||
TupleIterator first, TupleIterator last, Tuple* pivot) {
|
||||
// Copy pivot into temp_tuple since it points to a tuple within [first, last).
|
||||
memcpy(_temp_tuple_buffer, pivot, _tuple_size);
|
||||
|
||||
void SpillSorter::TupleSorter::find_the_median(TupleSorter::TupleIterator &first,
|
||||
TupleSorter::TupleIterator &last, TupleSorter::TupleIterator &mid) {
|
||||
last.prev();
|
||||
auto f_com_result = _less_than_comp.compare(reinterpret_cast<TupleRow*>(&first._current_tuple), reinterpret_cast<TupleRow*>(&mid._current_tuple));
|
||||
auto l_com_result = _less_than_comp.compare(reinterpret_cast<TupleRow*>(&last._current_tuple), reinterpret_cast<TupleRow*>(&mid._current_tuple));
|
||||
if (f_com_result == -1 && l_com_result == -1) {
|
||||
if (_less_than_comp(reinterpret_cast<TupleRow*>(&first._current_tuple),reinterpret_cast<TupleRow*>(&last._current_tuple))) {
|
||||
swap(mid._current_tuple, last._current_tuple);
|
||||
} else {
|
||||
swap(mid._current_tuple, first._current_tuple);
|
||||
}
|
||||
}
|
||||
if (f_com_result == 1 && l_com_result == 1) {
|
||||
if (_less_than_comp(reinterpret_cast<TupleRow *>(&first._current_tuple),
|
||||
reinterpret_cast<TupleRow *>(&last._current_tuple))) {
|
||||
swap(mid._current_tuple, first._current_tuple);
|
||||
} else {
|
||||
swap(mid._current_tuple, last._current_tuple);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SpillSorter::TupleSorter::TupleIterator SpillSorter::TupleSorter::partition(
|
||||
TupleIterator first, TupleIterator last, TupleIterator& mid) {
|
||||
find_the_median(first, last, mid);
|
||||
|
||||
// Copy &mid._current_tuple into temp_tuple since it points to a tuple within [first, last).
|
||||
memcpy(_temp_tuple_buffer, mid._current_tuple, _tuple_size);
|
||||
while (true) {
|
||||
// Search for the first and last out-of-place elements, and swap them.
|
||||
while (_less_than_comp(
|
||||
@ -968,14 +995,22 @@ void SpillSorter::TupleSorter::sort_helper(TupleIterator first, TupleIterator la
|
||||
}
|
||||
// Use insertion sort for smaller sequences.
|
||||
while (last._index - first._index > INSERTION_THRESHOLD) {
|
||||
TupleIterator iter(this, first._index + (last._index - first._index) / 2);
|
||||
DCHECK(iter._current_tuple != NULL);
|
||||
// partition() splits the tuples in [first, last) into two groups (<= pivot
|
||||
// and >= pivot) in-place. 'cut' is the index of the first tuple in the second group.
|
||||
TupleIterator cut = partition(first, last,
|
||||
reinterpret_cast<Tuple*>(iter._current_tuple));
|
||||
sort_helper(cut, last);
|
||||
last = cut;
|
||||
TupleIterator mid(this, first._index + (last._index - first._index) / 2);
|
||||
|
||||
DCHECK(mid._current_tuple != NULL);
|
||||
// partition() splits the tuples in [first, last) into two groups (<= mid iter
|
||||
// and >= mid iter) in-place. 'cut' is the index of the first tuple in the second group.
|
||||
TupleIterator cut = partition(first, last, mid);
|
||||
|
||||
// Recurse on the smaller partition. This limits stack size to log(n) stack frames.
|
||||
if (last._index - cut._index < cut._index - first._index) {
|
||||
sort_helper(cut, last);
|
||||
last = cut;
|
||||
} else {
|
||||
sort_helper(first, cut);
|
||||
first = cut;
|
||||
}
|
||||
|
||||
if (UNLIKELY(_state->is_cancelled())) {
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user