From f4c03fe8e28ce2fd8ba6a969c5a16ad02c010dce Mon Sep 17 00:00:00 2001 From: HappenLee Date: Tue, 26 May 2020 10:20:57 +0800 Subject: [PATCH] 1. Delete the code of Sort Node we do not use now. (#3666) Optimize the quick sort by find_the_median and try to reduce recursion level of quick sort. --- be/src/exec/CMakeLists.txt | 1 - be/src/exec/exec_node.cpp | 1 - be/src/exec/sort_node.cpp | 156 --------------------------------- be/src/exec/sort_node.h | 76 ---------------- be/src/runtime/spill_sorter.cc | 67 ++++++++++---- 5 files changed, 51 insertions(+), 250 deletions(-) delete mode 100644 be/src/exec/sort_node.cpp delete mode 100644 be/src/exec/sort_node.h diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 4844338cae..c42afcea25 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -49,7 +49,6 @@ set(EXEC_FILES text_converter.cpp topn_node.cpp sort_exec_exprs.cpp - sort_node.cpp olap_rewrite_node.cpp olap_scan_node.cpp olap_scanner.cpp diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 9a6f208c9b..226e42a966 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -45,7 +45,6 @@ #include "exec/repeat_node.h" #include "exec/schema_scan_node.h" #include "exec/select_node.h" -#include "exec/sort_node.h" #include "exec/spill_sort_node.h" #include "exec/topn_node.h" #include "exec/union_node.h" diff --git a/be/src/exec/sort_node.cpp b/be/src/exec/sort_node.cpp deleted file mode 100644 index da1d9a337d..0000000000 --- a/be/src/exec/sort_node.cpp +++ /dev/null @@ -1,156 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/sort_node.h" -#include "exec/sort_exec_exprs.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" -#include "util/runtime_profile.h" - -namespace doris { - -SortNode::SortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ExecNode(pool, tnode, descs), - _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), - _num_rows_skipped(0) { - Status status = init(tnode, nullptr); - DCHECK(status.ok()) << "SortNode c'tor:init failed: \n" << status.get_error_msg(); -} - -SortNode::~SortNode() { -} - -Status SortNode::init(const TPlanNode& tnode, RuntimeState* state) { - const vector* sort_tuple_slot_exprs = tnode.sort_node.__isset.sort_tuple_slot_exprs ? - &tnode.sort_node.sort_tuple_slot_exprs : NULL; - RETURN_IF_ERROR(_sort_exec_exprs.init(tnode.sort_node.ordering_exprs, - sort_tuple_slot_exprs, _pool)); - _is_asc_order = tnode.sort_node.is_asc_order; - _nulls_first = tnode.sort_node.nulls_first; - return Status::OK(); -} - -Status SortNode::prepare(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(ExecNode::prepare(state)); - RETURN_IF_ERROR(_sort_exec_exprs.prepare( - state, child(0)->row_desc(), _row_descriptor, expr_mem_tracker())); - return Status::OK(); -} - -Status SortNode::open(RuntimeState* state) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(ExecNode::open(state)); - RETURN_IF_ERROR(_sort_exec_exprs.open(state)); - RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(child(0)->open(state)); - - TupleRowComparator less_than( - _sort_exec_exprs.lhs_ordering_expr_ctxs(), _sort_exec_exprs.rhs_ordering_expr_ctxs(), - _is_asc_order, _nulls_first); - _sorter.reset(new MergeSorter( - less_than, _sort_exec_exprs.sort_tuple_slot_expr_ctxs(), - &_row_descriptor, runtime_profile(), state)); - - // The child has been opened and the sorter created. Sort the input. - // The final merge is done on-demand as rows are requested in GetNext(). - RETURN_IF_ERROR(sort_input(state)); - - // The child can be closed at this point. - child(0)->close(state); - return Status::OK(); -} - -Status SortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { - SCOPED_TIMER(_runtime_profile->total_time_counter()); - RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT)); - RETURN_IF_CANCELLED(state); - //RETURN_IF_ERROR(QueryMaintenance(state)); - - if (reached_limit()) { - *eos = true; - return Status::OK(); - } else { - *eos = false; - } - - DCHECK_EQ(row_batch->num_rows(), 0); - RETURN_IF_ERROR(_sorter->get_next(row_batch, eos)); - while ((_num_rows_skipped < _offset)) { - _num_rows_skipped += row_batch->num_rows(); - // Throw away rows in the output batch until the offset is skipped. - int rows_to_keep = _num_rows_skipped - _offset; - if (rows_to_keep > 0) { - row_batch->copy_rows(0, row_batch->num_rows() - rows_to_keep, rows_to_keep); - row_batch->set_num_rows(rows_to_keep); - } else { - row_batch->set_num_rows(0); - } - if (rows_to_keep > 0 || *eos) { - break; - } - RETURN_IF_ERROR(_sorter->get_next(row_batch, eos)); - } - - _num_rows_returned += row_batch->num_rows(); - if (reached_limit()) { - row_batch->set_num_rows(row_batch->num_rows() - (_num_rows_returned - _limit)); - *eos = true; - } - - COUNTER_SET(_rows_returned_counter, _num_rows_returned); - return Status::OK(); -} - -Status SortNode::close(RuntimeState* state) { - if (is_closed()) { - return Status::OK(); - } - _sort_exec_exprs.close(state); - _sorter.reset(); - return ExecNode::close(state); -} - -void SortNode::debug_string(int indentation_level, stringstream* out) const { - *out << string(indentation_level * 2, ' '); - *out << "SortNode("; - // << Expr::debug_string(_sort_exec_exprs.lhs_ordering_expr_ctxs()); - for (int i = 0; i < _is_asc_order.size(); ++i) { - *out << (i > 0 ? " " : "") - << (_is_asc_order[i] ? "asc" : "desc") - << " nulls " << (_nulls_first[i] ? "first" : "last"); - } - ExecNode::debug_string(indentation_level, out); - *out << ")"; -} - -Status SortNode::sort_input(RuntimeState* state) { - RowBatch batch(child(0)->row_desc(), state->batch_size(), mem_tracker()); - bool eos = false; - do { - batch.reset(); - RETURN_IF_ERROR(child(0)->get_next(state, &batch, &eos)); - RETURN_IF_ERROR(_sorter->add_batch(&batch)); - RETURN_IF_CANCELLED(state); - RETURN_IF_LIMIT_EXCEEDED(state, "Sort, while getting next from the child."); - // RETURN_IF_ERROR(QueryMaintenance(state)); - } while (!eos); - RETURN_IF_ERROR(_sorter->input_done()); - return Status::OK(); -} - -} diff --git a/be/src/exec/sort_node.h b/be/src/exec/sort_node.h deleted file mode 100644 index 2d5b1d88fc..0000000000 --- a/be/src/exec/sort_node.h +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef INF_DORIS_QE_SRC_BE_EXEC_SORT_NODE_H -#define INF_DORIS_QE_SRC_BE_EXEC_SORT_NODE_H - -#include "exec/exec_node.h" -#include "exec/sort_exec_exprs.h" -#include "runtime/merge_sorter.h" -#include "runtime/buffered_block_mgr.h" - -namespace doris { - -// Node that implements a full sort of its input with a fixed memory budget, spilling -// to disk if the input is larger than available memory. -// Uses Sorter and BufferedBlockMgr for the external sort implementation. -// Input rows to SortNode are materialized by the Sorter into a single tuple -// using the expressions specified in sort_exec_exprs_. -// In GetNext(), SortNode passes in the output batch to the sorter instance created -// in Open() to fill it with sorted rows. -// If a merge phase was performed in the sort, sorted rows are deep copied into -// the output batch. Otherwise, the sorter instance owns the sorted data. -class SortNode : public ExecNode { -public: - SortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - ~SortNode(); - - virtual Status prepare(RuntimeState* state); - virtual Status open(RuntimeState* state); - virtual Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status close(RuntimeState* state); - -protected: - virtual void debug_string(int indentation_level, std::stringstream* out) const; - -private: - Status init(const TPlanNode& tnode, RuntimeState* state = nullptr); - // Fetch input rows and feed them to the sorter until the input is exhausted. - Status sort_input(RuntimeState* state); - - // Create a block manager object and set it in block_mgr_. - // Returns and sets the query status to Status::MemoryLimitExceeded("Memory limit exceeded") if there is not - // enough memory for the sort. - Status create_block_mgr(RuntimeState* state); - - // Number of rows to skip. - int64_t _offset; - int64_t _num_rows_skipped; - - // Object used for external sorting. - boost::scoped_ptr _sorter; - - // Expressions and parameters used for tuple materialization and tuple comparison. - SortExecExprs _sort_exec_exprs; - std::vector _is_asc_order; - std::vector _nulls_first; - boost::scoped_ptr _tuple_pool; -}; - -} - -#endif diff --git a/be/src/runtime/spill_sorter.cc b/be/src/runtime/spill_sorter.cc index 0a5b3ffa1b..4456bc0be0 100644 --- a/be/src/runtime/spill_sorter.cc +++ b/be/src/runtime/spill_sorter.cc @@ -364,11 +364,16 @@ private: void insertion_sort(const TupleIterator& first, const TupleIterator& last); // Partitions the sequence of tuples in the range [first, last) in a run into two - // groups around the pivot tuple - i.e. tuples in first group are <= the pivot, and - // tuples in the second group are >= pivot. Tuples are swapped in place to create the + // groups around the mid._current_tuple - i.e. tuples in first group are <= the mid._current_tuple + // and tuples in the second group are >= mid._current_tuple. Tuples are swapped in place to create the // groups and the index to the first element in the second group is returned. // Checks _state->is_cancelled() and returns early with an invalid result if true. - TupleIterator partition(TupleIterator first, TupleIterator last, Tuple* pivot); + TupleIterator partition(TupleIterator first, TupleIterator last, TupleIterator& mid); + + // Select the median of three iterator tuples. taking the median tends to help us select better + // pivots that more evenly split the input range. This method makes selection of + // bad pivots very infrequent. + void find_the_median(TupleIterator& first, TupleIterator& last, TupleIterator& mid); // Performs a quicksort of rows in the range [first, last) followed by insertion sort // for smaller groups of elements. @@ -931,12 +936,34 @@ void SpillSorter::TupleSorter::insertion_sort(const TupleIterator& first, } } -SpillSorter::TupleSorter::TupleIterator SpillSorter::TupleSorter::partition( - TupleIterator first, TupleIterator last, Tuple* pivot) { - // Copy pivot into temp_tuple since it points to a tuple within [first, last). - memcpy(_temp_tuple_buffer, pivot, _tuple_size); - +void SpillSorter::TupleSorter::find_the_median(TupleSorter::TupleIterator &first, + TupleSorter::TupleIterator &last, TupleSorter::TupleIterator &mid) { last.prev(); + auto f_com_result = _less_than_comp.compare(reinterpret_cast(&first._current_tuple), reinterpret_cast(&mid._current_tuple)); + auto l_com_result = _less_than_comp.compare(reinterpret_cast(&last._current_tuple), reinterpret_cast(&mid._current_tuple)); + if (f_com_result == -1 && l_com_result == -1) { + if (_less_than_comp(reinterpret_cast(&first._current_tuple),reinterpret_cast(&last._current_tuple))) { + swap(mid._current_tuple, last._current_tuple); + } else { + swap(mid._current_tuple, first._current_tuple); + } + } + if (f_com_result == 1 && l_com_result == 1) { + if (_less_than_comp(reinterpret_cast(&first._current_tuple), + reinterpret_cast(&last._current_tuple))) { + swap(mid._current_tuple, first._current_tuple); + } else { + swap(mid._current_tuple, last._current_tuple); + } + } +} + +SpillSorter::TupleSorter::TupleIterator SpillSorter::TupleSorter::partition( + TupleIterator first, TupleIterator last, TupleIterator& mid) { + find_the_median(first, last, mid); + + // Copy &mid._current_tuple into temp_tuple since it points to a tuple within [first, last). + memcpy(_temp_tuple_buffer, mid._current_tuple, _tuple_size); while (true) { // Search for the first and last out-of-place elements, and swap them. while (_less_than_comp( @@ -968,14 +995,22 @@ void SpillSorter::TupleSorter::sort_helper(TupleIterator first, TupleIterator la } // Use insertion sort for smaller sequences. while (last._index - first._index > INSERTION_THRESHOLD) { - TupleIterator iter(this, first._index + (last._index - first._index) / 2); - DCHECK(iter._current_tuple != NULL); - // partition() splits the tuples in [first, last) into two groups (<= pivot - // and >= pivot) in-place. 'cut' is the index of the first tuple in the second group. - TupleIterator cut = partition(first, last, - reinterpret_cast(iter._current_tuple)); - sort_helper(cut, last); - last = cut; + TupleIterator mid(this, first._index + (last._index - first._index) / 2); + + DCHECK(mid._current_tuple != NULL); + // partition() splits the tuples in [first, last) into two groups (<= mid iter + // and >= mid iter) in-place. 'cut' is the index of the first tuple in the second group. + TupleIterator cut = partition(first, last, mid); + + // Recurse on the smaller partition. This limits stack size to log(n) stack frames. + if (last._index - cut._index < cut._index - first._index) { + sort_helper(cut, last); + last = cut; + } else { + sort_helper(first, cut); + first = cut; + } + if (UNLIKELY(_state->is_cancelled())) { return; }