[vectorized](feature) support partition sort node (#19708)
This commit is contained in:
@ -62,6 +62,7 @@
|
||||
#include "vec/exec/vempty_set_node.h"
|
||||
#include "vec/exec/vexchange_node.h"
|
||||
#include "vec/exec/vmysql_scan_node.h" // IWYU pragma: keep
|
||||
#include "vec/exec/vpartition_sort_node.h"
|
||||
#include "vec/exec/vrepeat_node.h"
|
||||
#include "vec/exec/vschema_scan_node.h"
|
||||
#include "vec/exec/vselect_node.h"
|
||||
@ -318,6 +319,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
|
||||
case TPlanNodeType::FILE_SCAN_NODE:
|
||||
case TPlanNodeType::JDBC_SCAN_NODE:
|
||||
case TPlanNodeType::META_SCAN_NODE:
|
||||
case TPlanNodeType::PARTITION_SORT_NODE:
|
||||
break;
|
||||
default: {
|
||||
const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
|
||||
@ -438,6 +440,9 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN
|
||||
*node = pool->add(new vectorized::VDataGenFunctionScanNode(pool, tnode, descs));
|
||||
return Status::OK();
|
||||
|
||||
case TPlanNodeType::PARTITION_SORT_NODE:
|
||||
*node = pool->add(new vectorized::VPartitionSortNode(pool, tnode, descs));
|
||||
return Status::OK();
|
||||
default:
|
||||
std::map<int, const char*>::const_iterator i =
|
||||
_TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
|
||||
|
||||
54
be/src/pipeline/exec/partition_sort_sink_operator.h
Normal file
54
be/src/pipeline/exec/partition_sort_sink_operator.h
Normal file
@ -0,0 +1,54 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include "operator.h"
|
||||
#include "vec/exec/vpartition_sort_node.h"
|
||||
|
||||
namespace doris {
|
||||
class ExecNode;
|
||||
|
||||
namespace pipeline {
|
||||
|
||||
class PartitionSortSinkOperatorBuilder final
|
||||
: public OperatorBuilder<vectorized::VPartitionSortNode> {
|
||||
public:
|
||||
PartitionSortSinkOperatorBuilder(int32_t id, ExecNode* sort_node)
|
||||
: OperatorBuilder(id, "PartitionSortSinkOperator", sort_node) {}
|
||||
|
||||
bool is_sink() const override { return true; }
|
||||
|
||||
OperatorPtr build_operator() override;
|
||||
};
|
||||
|
||||
class PartitionSortSinkOperator final : public StreamingOperator<PartitionSortSinkOperatorBuilder> {
|
||||
public:
|
||||
PartitionSortSinkOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node)
|
||||
: StreamingOperator(operator_builder, sort_node) {};
|
||||
|
||||
bool can_write() override { return true; }
|
||||
};
|
||||
|
||||
OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() {
|
||||
return std::make_shared<PartitionSortSinkOperator>(this, _node);
|
||||
}
|
||||
|
||||
} // namespace pipeline
|
||||
} // namespace doris
|
||||
56
be/src/pipeline/exec/partition_sort_source_operator.h
Normal file
56
be/src/pipeline/exec/partition_sort_source_operator.h
Normal file
@ -0,0 +1,56 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "operator.h"
|
||||
#include "vec/exec/vpartition_sort_node.h"
|
||||
|
||||
namespace doris {
|
||||
class ExecNode;
|
||||
class RuntimeState;
|
||||
|
||||
namespace pipeline {
|
||||
|
||||
class PartitionSortSourceOperatorBuilder final
|
||||
: public OperatorBuilder<vectorized::VPartitionSortNode> {
|
||||
public:
|
||||
PartitionSortSourceOperatorBuilder(int32_t id, ExecNode* sort_node)
|
||||
: OperatorBuilder(id, "PartitionSortSourceOperator", sort_node) {}
|
||||
|
||||
bool is_source() const override { return true; }
|
||||
|
||||
OperatorPtr build_operator() override;
|
||||
};
|
||||
|
||||
class PartitionSortSourceOperator final
|
||||
: public SourceOperator<PartitionSortSourceOperatorBuilder> {
|
||||
public:
|
||||
PartitionSortSourceOperator(OperatorBuilderBase* operator_builder, ExecNode* sort_node)
|
||||
: SourceOperator(operator_builder, sort_node) {}
|
||||
Status open(RuntimeState*) override { return Status::OK(); }
|
||||
};
|
||||
|
||||
OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() {
|
||||
return std::make_shared<PartitionSortSourceOperator>(this, _node);
|
||||
}
|
||||
|
||||
} // namespace pipeline
|
||||
} // namespace doris
|
||||
@ -61,6 +61,8 @@
|
||||
#include "pipeline/exec/nested_loop_join_probe_operator.h"
|
||||
#include "pipeline/exec/olap_table_sink_operator.h"
|
||||
#include "pipeline/exec/operator.h"
|
||||
#include "pipeline/exec/partition_sort_sink_operator.h"
|
||||
#include "pipeline/exec/partition_sort_source_operator.h"
|
||||
#include "pipeline/exec/repeat_operator.h"
|
||||
#include "pipeline/exec/result_file_sink_operator.h"
|
||||
#include "pipeline/exec/result_sink_operator.h"
|
||||
@ -532,6 +534,20 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur
|
||||
RETURN_IF_ERROR(cur_pipe->add_operator(sort_source));
|
||||
break;
|
||||
}
|
||||
case TPlanNodeType::PARTITION_SORT_NODE: {
|
||||
auto new_pipeline = add_pipeline();
|
||||
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
|
||||
|
||||
OperatorBuilderPtr partition_sort_sink = std::make_shared<PartitionSortSinkOperatorBuilder>(
|
||||
next_operator_builder_id(), node);
|
||||
RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink));
|
||||
|
||||
OperatorBuilderPtr partition_sort_source =
|
||||
std::make_shared<PartitionSortSourceOperatorBuilder>(next_operator_builder_id(),
|
||||
node);
|
||||
RETURN_IF_ERROR(cur_pipe->add_operator(partition_sort_source));
|
||||
break;
|
||||
}
|
||||
case TPlanNodeType::ANALYTIC_EVAL_NODE: {
|
||||
auto new_pipeline = add_pipeline();
|
||||
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
|
||||
|
||||
@ -71,6 +71,7 @@ set(VEC_FILES
|
||||
common/sort/sorter.cpp
|
||||
common/sort/topn_sorter.cpp
|
||||
common/sort/vsort_exec_exprs.cpp
|
||||
common/sort/partition_sorter.cpp
|
||||
common/string_utils/string_utils.cpp
|
||||
common/hex.cpp
|
||||
common/allocator.cpp
|
||||
@ -136,6 +137,7 @@ set(VEC_FILES
|
||||
exec/vrepeat_node.cpp
|
||||
exec/vtable_function_node.cpp
|
||||
exec/vjdbc_connector.cpp
|
||||
exec/vpartition_sort_node.cpp
|
||||
exec/join/vhash_join_node.cpp
|
||||
exec/join/vjoin_node_base.cpp
|
||||
exec/join/vnested_loop_join_node.cpp
|
||||
|
||||
@ -258,6 +258,38 @@ struct HashMethodSingleLowNullableColumn : public SingleColumnMethod {
|
||||
return EmplaceResult(inserted);
|
||||
}
|
||||
|
||||
template <typename Data>
|
||||
ALWAYS_INLINE EmplaceResult emplace_key(Data& data, size_t hash_value, size_t row,
|
||||
Arena& pool) {
|
||||
if (key_column->is_null_at(row)) {
|
||||
bool has_null_key = data.has_null_key_data();
|
||||
data.has_null_key_data() = true;
|
||||
|
||||
if constexpr (has_mapped) {
|
||||
return EmplaceResult(data.get_null_key_data(), data.get_null_key_data(),
|
||||
!has_null_key);
|
||||
} else {
|
||||
return EmplaceResult(!has_null_key);
|
||||
}
|
||||
}
|
||||
|
||||
auto key_holder = Base::get_key_holder(row, pool);
|
||||
|
||||
bool inserted = false;
|
||||
typename Data::LookupResult it;
|
||||
data.emplace(key_holder, it, hash_value, inserted);
|
||||
|
||||
if constexpr (has_mapped) {
|
||||
auto& mapped = *lookup_result_get_mapped(it);
|
||||
if (inserted) {
|
||||
new (&mapped) Mapped();
|
||||
}
|
||||
return EmplaceResult(mapped, mapped, inserted);
|
||||
} else {
|
||||
return EmplaceResult(inserted);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Data, typename Func, typename CreatorForNull>
|
||||
ALWAYS_INLINE typename std::enable_if_t<has_mapped, Mapped>& lazy_emplace_key(
|
||||
Data& data, size_t row, Arena& pool, Func&& f, CreatorForNull&& null_creator) {
|
||||
|
||||
203
be/src/vec/common/sort/partition_sorter.cpp
Normal file
203
be/src/vec/common/sort/partition_sorter.cpp
Normal file
@ -0,0 +1,203 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "vec/common/sort/partition_sorter.h"
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <queue>
|
||||
|
||||
#include "common/object_pool.h"
|
||||
#include "vec/core/block.h"
|
||||
#include "vec/core/sort_cursor.h"
|
||||
#include "vec/functions/function_binary_arithmetic.h"
|
||||
#include "vec/utils/util.hpp"
|
||||
|
||||
namespace doris {
|
||||
class RowDescriptor;
|
||||
class RuntimeProfile;
|
||||
class RuntimeState;
|
||||
|
||||
namespace vectorized {
|
||||
class VSortExecExprs;
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
PartitionSorter::PartitionSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset,
|
||||
ObjectPool* pool, std::vector<bool>& is_asc_order,
|
||||
std::vector<bool>& nulls_first, const RowDescriptor& row_desc,
|
||||
RuntimeState* state, RuntimeProfile* profile,
|
||||
bool has_global_limit, int partition_inner_limit,
|
||||
TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row)
|
||||
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order, nulls_first),
|
||||
_state(MergeSorterState::create_unique(row_desc, offset, limit, state, profile)),
|
||||
_row_desc(row_desc),
|
||||
_has_global_limit(has_global_limit),
|
||||
_partition_inner_limit(partition_inner_limit),
|
||||
_top_n_algorithm(top_n_algorithm),
|
||||
_previous_row(previous_row) {}
|
||||
|
||||
Status PartitionSorter::append_block(Block* input_block) {
|
||||
Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc);
|
||||
DCHECK(input_block->columns() == sorted_block.columns());
|
||||
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
|
||||
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PartitionSorter::prepare_for_read() {
|
||||
auto& cursors = _state->get_cursors();
|
||||
auto& blocks = _state->get_sorted_block();
|
||||
auto& priority_queue = _state->get_priority_queue();
|
||||
for (const auto& block : blocks) {
|
||||
cursors.emplace_back(block, _sort_description);
|
||||
}
|
||||
for (auto& cursor : cursors) {
|
||||
priority_queue.push(MergeSortCursor(&cursor));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
|
||||
if (_state->get_sorted_block().empty()) {
|
||||
*eos = true;
|
||||
} else {
|
||||
if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
|
||||
auto& sorted_block = _state->get_sorted_block()[0];
|
||||
block->swap(sorted_block);
|
||||
block->set_num_rows(_partition_inner_limit);
|
||||
*eos = true;
|
||||
} else {
|
||||
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
|
||||
const auto& sorted_block = _state->get_sorted_block()[0];
|
||||
size_t num_columns = sorted_block.columns();
|
||||
bool mem_reuse = output_block->mem_reuse();
|
||||
MutableColumns merged_columns =
|
||||
mem_reuse ? output_block->mutate_columns() : sorted_block.clone_empty_columns();
|
||||
|
||||
size_t current_output_rows = 0;
|
||||
auto& priority_queue = _state->get_priority_queue();
|
||||
|
||||
bool get_enough_data = false;
|
||||
bool first_compare_row = false;
|
||||
while (!priority_queue.empty()) {
|
||||
auto current = priority_queue.top();
|
||||
priority_queue.pop();
|
||||
if (UNLIKELY(_previous_row->impl == nullptr)) {
|
||||
first_compare_row = true;
|
||||
*_previous_row = current;
|
||||
}
|
||||
|
||||
switch (_top_n_algorithm) {
|
||||
case TopNAlgorithm::ROW_NUMBER: {
|
||||
//1 row_number no need to check distinct, just output partition_inner_limit row
|
||||
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
|
||||
for (size_t i = 0; i < num_columns; ++i) {
|
||||
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
|
||||
}
|
||||
} else {
|
||||
//rows has get enough
|
||||
get_enough_data = true;
|
||||
}
|
||||
current_output_rows++;
|
||||
break;
|
||||
}
|
||||
case TopNAlgorithm::DENSE_RANK: {
|
||||
//3 dense_rank() maybe need distinct rows of partition_inner_limit
|
||||
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
|
||||
for (size_t i = 0; i < num_columns; ++i) {
|
||||
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
|
||||
}
|
||||
} else {
|
||||
get_enough_data = true;
|
||||
}
|
||||
if (_has_global_limit) {
|
||||
current_output_rows++;
|
||||
} else {
|
||||
//when it's first comes, the rows are same no need compare
|
||||
if (first_compare_row) {
|
||||
current_output_rows++;
|
||||
first_compare_row = false;
|
||||
} else {
|
||||
// not the first comes, so need compare those, when is distinct row
|
||||
// so could current_output_rows++
|
||||
bool cmp_res = _previous_row->compare_two_rows(current);
|
||||
if (cmp_res == false) { // distinct row
|
||||
current_output_rows++;
|
||||
*_previous_row = current;
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TopNAlgorithm::RANK: {
|
||||
if (_has_global_limit &&
|
||||
(current_output_rows + _output_total_rows) >= _partition_inner_limit) {
|
||||
get_enough_data = true;
|
||||
break;
|
||||
}
|
||||
bool cmp_res = _previous_row->compare_two_rows(current);
|
||||
//get a distinct row
|
||||
if (cmp_res == false) {
|
||||
//here must be check distinct of two rows, and then check nums of row
|
||||
if ((current_output_rows + _output_total_rows) >= _partition_inner_limit) {
|
||||
get_enough_data = true;
|
||||
break;
|
||||
}
|
||||
*_previous_row = current;
|
||||
}
|
||||
for (size_t i = 0; i < num_columns; ++i) {
|
||||
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
|
||||
}
|
||||
current_output_rows++;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (!current->isLast()) {
|
||||
current->next();
|
||||
priority_queue.push(current);
|
||||
}
|
||||
|
||||
if (current_output_rows == batch_size || get_enough_data == true) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!mem_reuse) {
|
||||
Block merge_block = sorted_block.clone_with_columns(std::move(merged_columns));
|
||||
merge_block.swap(*output_block);
|
||||
}
|
||||
_output_total_rows += output_block->rows();
|
||||
if (current_output_rows == 0 || get_enough_data == true) {
|
||||
*eos = true;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
108
be/src/vec/common/sort/partition_sorter.h
Normal file
108
be/src/vec/common/sort/partition_sorter.h
Normal file
@ -0,0 +1,108 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
#include <gen_cpp/PlanNodes_types.h>
|
||||
#include <stddef.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "vec/common/sort/sorter.h"
|
||||
|
||||
namespace doris {
|
||||
class ObjectPool;
|
||||
class RowDescriptor;
|
||||
class RuntimeProfile;
|
||||
class RuntimeState;
|
||||
|
||||
namespace vectorized {
|
||||
class Block;
|
||||
class VSortExecExprs;
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
struct SortCursorCmp {
|
||||
public:
|
||||
SortCursorCmp() {
|
||||
impl = nullptr;
|
||||
row = 0;
|
||||
}
|
||||
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {}
|
||||
|
||||
void reset() {
|
||||
impl = nullptr;
|
||||
row = 0;
|
||||
}
|
||||
bool compare_two_rows(const MergeSortCursor& rhs) const {
|
||||
for (size_t i = 0; i < impl->sort_columns_size; ++i) {
|
||||
int direction = impl->desc[i].direction;
|
||||
int nulls_direction = impl->desc[i].nulls_direction;
|
||||
int res = direction * impl->sort_columns[i]->compare_at(row, rhs.impl->pos,
|
||||
*(rhs.impl->sort_columns[i]),
|
||||
nulls_direction);
|
||||
if (res != 0) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
int row = 0;
|
||||
MergeSortCursorImpl* impl;
|
||||
};
|
||||
|
||||
class PartitionSorter final : public Sorter {
|
||||
ENABLE_FACTORY_CREATOR(PartitionSorter);
|
||||
|
||||
public:
|
||||
PartitionSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset, ObjectPool* pool,
|
||||
std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first,
|
||||
const RowDescriptor& row_desc, RuntimeState* state, RuntimeProfile* profile,
|
||||
bool has_global_limit, int partition_inner_limit,
|
||||
TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row);
|
||||
|
||||
~PartitionSorter() override = default;
|
||||
|
||||
Status append_block(Block* block) override;
|
||||
|
||||
Status prepare_for_read() override;
|
||||
|
||||
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
|
||||
size_t data_size() const override { return _state->data_size(); }
|
||||
|
||||
bool is_spilled() const override { return false; }
|
||||
|
||||
Status partition_sort_read(Block* block, bool* eos, int batch_size);
|
||||
int64 get_output_rows() const { return _output_total_rows; }
|
||||
|
||||
private:
|
||||
std::unique_ptr<MergeSorterState> _state;
|
||||
const RowDescriptor& _row_desc;
|
||||
int64 _output_total_rows = 0;
|
||||
bool _has_global_limit = false;
|
||||
int _partition_inner_limit = 0;
|
||||
TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::type::ROW_NUMBER;
|
||||
SortCursorCmp* _previous_row;
|
||||
};
|
||||
|
||||
} // namespace doris::vectorized
|
||||
@ -60,13 +60,14 @@ public:
|
||||
limit_(limit),
|
||||
profile_(profile) {
|
||||
external_sort_bytes_threshold_ = state->external_sort_bytes_threshold();
|
||||
if (profile != nullptr) {
|
||||
block_spill_profile_ = profile->create_child("BlockSpill", true, true);
|
||||
profile->add_child(block_spill_profile_, false, nullptr);
|
||||
|
||||
block_spill_profile_ = profile->create_child("BlockSpill", true, true);
|
||||
profile->add_child(block_spill_profile_, false, nullptr);
|
||||
|
||||
spilled_block_count_ = ADD_COUNTER(block_spill_profile_, "BlockCount", TUnit::UNIT);
|
||||
spilled_original_block_size_ =
|
||||
ADD_COUNTER(block_spill_profile_, "BlockBytes", TUnit::BYTES);
|
||||
spilled_block_count_ = ADD_COUNTER(block_spill_profile_, "BlockCount", TUnit::UNIT);
|
||||
spilled_original_block_size_ =
|
||||
ADD_COUNTER(block_spill_profile_, "BlockBytes", TUnit::BYTES);
|
||||
}
|
||||
}
|
||||
|
||||
~MergeSorterState() = default;
|
||||
@ -91,6 +92,10 @@ public:
|
||||
|
||||
const Block& last_sorted_block() const { return sorted_blocks_.back(); }
|
||||
|
||||
std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
|
||||
std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; }
|
||||
std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
|
||||
|
||||
std::unique_ptr<Block> unsorted_block_;
|
||||
|
||||
private:
|
||||
|
||||
454
be/src/vec/exec/vpartition_sort_node.cpp
Normal file
454
be/src/vec/exec/vpartition_sort_node.cpp
Normal file
@ -0,0 +1,454 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#include "vec/exec/vpartition_sort_node.h"
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "common/object_pool.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
#include "vec/common/hash_table/hash_set.h"
|
||||
#include "vec/exprs/vexpr.h"
|
||||
#include "vec/exprs/vexpr_context.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
// Here is an empirical value.
|
||||
static constexpr size_t HASH_MAP_PREFETCH_DIST = 16;
|
||||
VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode,
|
||||
const DescriptorTbl& descs)
|
||||
: ExecNode(pool, tnode, descs), _hash_table_size_counter(nullptr) {
|
||||
_partitioned_data = std::make_unique<PartitionedHashMapVariants>();
|
||||
_agg_arena_pool = std::make_unique<Arena>();
|
||||
_previous_row = std::make_unique<SortCursorCmp>();
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ExecNode::init(tnode, state));
|
||||
|
||||
//order by key
|
||||
if (tnode.partition_sort_node.__isset.sort_info) {
|
||||
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.partition_sort_node.sort_info, _pool));
|
||||
_is_asc_order = tnode.partition_sort_node.sort_info.is_asc_order;
|
||||
_nulls_first = tnode.partition_sort_node.sort_info.nulls_first;
|
||||
}
|
||||
//partition by key
|
||||
if (tnode.partition_sort_node.__isset.partition_exprs) {
|
||||
RETURN_IF_ERROR(VExpr::create_expr_trees(_pool, tnode.partition_sort_node.partition_exprs,
|
||||
&_partition_expr_ctxs));
|
||||
_partition_exprs_num = _partition_expr_ctxs.size();
|
||||
_partition_columns.resize(_partition_exprs_num);
|
||||
}
|
||||
if (_partition_exprs_num == 0) {
|
||||
_value_places.push_back(_pool->add(new PartitionBlocks()));
|
||||
}
|
||||
|
||||
_has_global_limit = tnode.partition_sort_node.has_global_limit;
|
||||
_top_n_algorithm = tnode.partition_sort_node.top_n_algorithm;
|
||||
_partition_inner_limit = tnode.partition_sort_node.partition_inner_limit;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::prepare(RuntimeState* state) {
|
||||
VLOG_CRITICAL << "VPartitionSortNode::prepare";
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
_hash_table_size_counter = ADD_COUNTER(_runtime_profile, "HashTableSize", TUnit::UNIT);
|
||||
_build_timer = ADD_TIMER(runtime_profile(), "HashTableBuildTime");
|
||||
_partition_sort_timer = ADD_TIMER(runtime_profile(), "PartitionSortTime");
|
||||
_get_sorted_timer = ADD_TIMER(runtime_profile(), "GetSortedTime");
|
||||
_selector_block_timer = ADD_TIMER(runtime_profile(), "SelectorBlockTime");
|
||||
_emplace_key_timer = ADD_TIMER(runtime_profile(), "EmplaceKeyTime");
|
||||
|
||||
RETURN_IF_ERROR(ExecNode::prepare(state));
|
||||
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor));
|
||||
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, child(0)->row_desc()));
|
||||
_init_hash_method();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::_split_block_by_partition(vectorized::Block* input_block,
|
||||
int batch_size) {
|
||||
for (int i = 0; i < _partition_exprs_num; ++i) {
|
||||
int result_column_id = -1;
|
||||
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id));
|
||||
DCHECK(result_column_id != -1);
|
||||
_partition_columns[i] = input_block->get_by_position(result_column_id).column.get();
|
||||
}
|
||||
_emplace_into_hash_table(_partition_columns, input_block, batch_size);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_columns,
|
||||
const vectorized::Block* input_block,
|
||||
int batch_size) {
|
||||
std::visit(
|
||||
[&](auto&& agg_method) -> void {
|
||||
SCOPED_TIMER(_build_timer);
|
||||
using HashMethodType = std::decay_t<decltype(agg_method)>;
|
||||
using HashTableType = std::decay_t<decltype(agg_method.data)>;
|
||||
using AggState = typename HashMethodType::State;
|
||||
|
||||
AggState state(key_columns, _partition_key_sz, nullptr);
|
||||
size_t num_rows = input_block->rows();
|
||||
_pre_serialize_key_if_need(state, agg_method, key_columns, num_rows);
|
||||
|
||||
//PHHashMap
|
||||
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
|
||||
if (_hash_values.size() < num_rows) {
|
||||
_hash_values.resize(num_rows);
|
||||
}
|
||||
if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
|
||||
AggState>::value) {
|
||||
for (size_t i = 0; i < num_rows; ++i) {
|
||||
_hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
|
||||
}
|
||||
} else {
|
||||
for (size_t i = 0; i < num_rows; ++i) {
|
||||
_hash_values[i] =
|
||||
agg_method.data.hash(state.get_key_holder(i, *_agg_arena_pool));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t row = 0; row < num_rows; ++row) {
|
||||
SCOPED_TIMER(_emplace_key_timer);
|
||||
PartitionDataPtr aggregate_data = nullptr;
|
||||
auto emplace_result = [&]() {
|
||||
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
|
||||
if (LIKELY(row + HASH_MAP_PREFETCH_DIST < num_rows)) {
|
||||
agg_method.data.prefetch_by_hash(
|
||||
_hash_values[row + HASH_MAP_PREFETCH_DIST]);
|
||||
}
|
||||
return state.emplace_key(agg_method.data, _hash_values[row], row,
|
||||
*_agg_arena_pool);
|
||||
} else {
|
||||
return state.emplace_key(agg_method.data, row, *_agg_arena_pool);
|
||||
}
|
||||
}();
|
||||
|
||||
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
|
||||
if (emplace_result.is_inserted()) {
|
||||
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
|
||||
emplace_result.set_mapped(nullptr);
|
||||
aggregate_data = _pool->add(new PartitionBlocks());
|
||||
emplace_result.set_mapped(aggregate_data);
|
||||
_value_places.push_back(aggregate_data);
|
||||
_num_partition++;
|
||||
} else {
|
||||
aggregate_data = emplace_result.get_mapped();
|
||||
}
|
||||
assert(aggregate_data != nullptr);
|
||||
aggregate_data->add_row_idx(row);
|
||||
}
|
||||
for (auto place : _value_places) {
|
||||
SCOPED_TIMER(_selector_block_timer);
|
||||
place->append_block_by_selector(input_block, child(0)->row_desc(),
|
||||
_has_global_limit, _partition_inner_limit,
|
||||
batch_size);
|
||||
}
|
||||
},
|
||||
_partitioned_data->_partition_method_variant);
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_block, bool eos) {
|
||||
auto current_rows = input_block->rows();
|
||||
if (current_rows > 0) {
|
||||
child_input_rows = child_input_rows + current_rows;
|
||||
if (UNLIKELY(_partition_exprs_num == 0)) {
|
||||
//no partition key
|
||||
_value_places[0]->append_whole_block(input_block, child(0)->row_desc());
|
||||
} else {
|
||||
//just simply use partition num to check
|
||||
//TODO: here could set can read to true directly. need mutex
|
||||
if (_num_partition > 512 && child_input_rows < 10000 * _num_partition) {
|
||||
_blocks_buffer.push(std::move(*input_block));
|
||||
} else {
|
||||
RETURN_IF_ERROR(_split_block_by_partition(input_block, state->batch_size()));
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(
|
||||
state->check_query_state("VPartitionSortNode, while split input block."));
|
||||
input_block->clear_column_data();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (eos) {
|
||||
//seems could free for hashtable
|
||||
_agg_arena_pool.reset(nullptr);
|
||||
_partitioned_data.reset(nullptr);
|
||||
SCOPED_TIMER(_partition_sort_timer);
|
||||
for (int i = 0; i < _value_places.size(); ++i) {
|
||||
auto sorter = PartitionSorter::create_unique(
|
||||
_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first,
|
||||
child(0)->row_desc(), state, i == 0 ? _runtime_profile.get() : nullptr,
|
||||
_has_global_limit, _partition_inner_limit, _top_n_algorithm,
|
||||
_previous_row.get());
|
||||
|
||||
DCHECK(child(0)->row_desc().num_materialized_slots() ==
|
||||
_value_places[i]->blocks.back()->columns());
|
||||
//get blocks from every partition, and sorter get those data.
|
||||
for (const auto& block : _value_places[i]->blocks) {
|
||||
RETURN_IF_ERROR(sorter->append_block(block.get()));
|
||||
}
|
||||
sorter->init_profile(_runtime_profile.get());
|
||||
RETURN_IF_ERROR(sorter->prepare_for_read());
|
||||
_partition_sorts.push_back(std::move(sorter));
|
||||
}
|
||||
if (state->enable_profile()) {
|
||||
debug_profile();
|
||||
}
|
||||
COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition));
|
||||
_can_read = true;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::open(RuntimeState* state) {
|
||||
VLOG_CRITICAL << "VPartitionSortNode::open";
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_ERROR(ExecNode::open(state));
|
||||
RETURN_IF_ERROR(child(0)->open(state));
|
||||
|
||||
bool eos = false;
|
||||
std::unique_ptr<Block> input_block = Block::create_unique();
|
||||
do {
|
||||
RETURN_IF_ERROR(child(0)->get_next_after_projects(
|
||||
state, input_block.get(), &eos,
|
||||
std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) &
|
||||
ExecNode::get_next,
|
||||
_children[0], std::placeholders::_1, std::placeholders::_2,
|
||||
std::placeholders::_3)));
|
||||
RETURN_IF_ERROR(sink(state, input_block.get(), eos));
|
||||
} while (!eos);
|
||||
|
||||
child(0)->close(state);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::alloc_resource(RuntimeState* state) {
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
|
||||
RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
|
||||
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
|
||||
RETURN_IF_CANCELLED(state);
|
||||
RETURN_IF_ERROR(state->check_query_state("VPartitionSortNode, while open."));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block,
|
||||
bool* eos) {
|
||||
RETURN_IF_CANCELLED(state);
|
||||
output_block->clear_column_data();
|
||||
bool current_eos = false;
|
||||
RETURN_IF_ERROR(get_sorted_block(state, output_block, ¤t_eos));
|
||||
if (_sort_idx >= _partition_sorts.size() && output_block->rows() == 0) {
|
||||
if (_blocks_buffer.empty() == false) {
|
||||
_blocks_buffer.front().swap(*output_block);
|
||||
_blocks_buffer.pop();
|
||||
} else {
|
||||
*eos = true;
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::get_next(RuntimeState* state, Block* output_block, bool* eos) {
|
||||
if (state == nullptr || output_block == nullptr || eos == nullptr) {
|
||||
return Status::InternalError("input is nullptr");
|
||||
}
|
||||
VLOG_CRITICAL << "VPartitionSortNode::get_next";
|
||||
SCOPED_TIMER(_runtime_profile->total_time_counter());
|
||||
|
||||
return pull(state, output_block, eos);
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::get_sorted_block(RuntimeState* state, Block* output_block,
|
||||
bool* current_eos) {
|
||||
SCOPED_TIMER(_get_sorted_timer);
|
||||
//sorter output data one by one
|
||||
if (_sort_idx < _partition_sorts.size()) {
|
||||
RETURN_IF_ERROR(_partition_sorts[_sort_idx]->get_next(state, output_block, current_eos));
|
||||
}
|
||||
if (*current_eos) {
|
||||
//current sort have eos, so get next idx
|
||||
_previous_row->reset();
|
||||
auto rows = _partition_sorts[_sort_idx]->get_output_rows();
|
||||
partition_profile_output_rows.push_back(rows);
|
||||
_num_rows_returned += rows;
|
||||
_partition_sorts[_sort_idx].reset(nullptr);
|
||||
_sort_idx++;
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status VPartitionSortNode::close(RuntimeState* state) {
|
||||
VLOG_CRITICAL << "VPartitionSortNode::close";
|
||||
if (is_closed()) {
|
||||
return Status::OK();
|
||||
}
|
||||
return ExecNode::close(state);
|
||||
}
|
||||
|
||||
void VPartitionSortNode::release_resource(RuntimeState* state) {
|
||||
VExpr::close(_partition_expr_ctxs, state);
|
||||
_vsort_exec_exprs.close(state);
|
||||
ExecNode::release_resource(state);
|
||||
}
|
||||
|
||||
void VPartitionSortNode::_init_hash_method() {
|
||||
if (_partition_exprs_num == 0) {
|
||||
return;
|
||||
} else if (_partition_exprs_num == 1) {
|
||||
auto is_nullable = _partition_expr_ctxs[0]->root()->is_nullable();
|
||||
switch (_partition_expr_ctxs[0]->root()->result_type()) {
|
||||
case TYPE_TINYINT:
|
||||
case TYPE_BOOLEAN:
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int8_key, is_nullable);
|
||||
return;
|
||||
case TYPE_SMALLINT:
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int16_key, is_nullable);
|
||||
return;
|
||||
case TYPE_INT:
|
||||
case TYPE_FLOAT:
|
||||
case TYPE_DATEV2:
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int32_key, is_nullable);
|
||||
return;
|
||||
case TYPE_BIGINT:
|
||||
case TYPE_DOUBLE:
|
||||
case TYPE_DATE:
|
||||
case TYPE_DATETIME:
|
||||
case TYPE_DATETIMEV2:
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_key, is_nullable);
|
||||
return;
|
||||
case TYPE_LARGEINT: {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_key, is_nullable);
|
||||
return;
|
||||
}
|
||||
case TYPE_DECIMALV2:
|
||||
case TYPE_DECIMAL32:
|
||||
case TYPE_DECIMAL64:
|
||||
case TYPE_DECIMAL128I: {
|
||||
DataTypePtr& type_ptr = _partition_expr_ctxs[0]->root()->data_type();
|
||||
TypeIndex idx = is_nullable ? assert_cast<const DataTypeNullable&>(*type_ptr)
|
||||
.get_nested_type()
|
||||
->get_type_id()
|
||||
: type_ptr->get_type_id();
|
||||
WhichDataType which(idx);
|
||||
if (which.is_decimal32()) {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int32_key, is_nullable);
|
||||
} else if (which.is_decimal64()) {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_key, is_nullable);
|
||||
} else {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_key, is_nullable);
|
||||
}
|
||||
return;
|
||||
}
|
||||
case TYPE_CHAR:
|
||||
case TYPE_VARCHAR:
|
||||
case TYPE_STRING: {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::string_key, is_nullable);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::serialized);
|
||||
}
|
||||
} else {
|
||||
bool use_fixed_key = true;
|
||||
bool has_null = false;
|
||||
int key_byte_size = 0;
|
||||
|
||||
_partition_key_sz.resize(_partition_exprs_num);
|
||||
for (int i = 0; i < _partition_exprs_num; ++i) {
|
||||
const auto& data_type = _partition_expr_ctxs[i]->root()->data_type();
|
||||
|
||||
if (!data_type->have_maximum_size_of_value()) {
|
||||
use_fixed_key = false;
|
||||
break;
|
||||
}
|
||||
|
||||
auto is_null = data_type->is_nullable();
|
||||
has_null |= is_null;
|
||||
_partition_key_sz[i] =
|
||||
data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0);
|
||||
key_byte_size += _partition_key_sz[i];
|
||||
}
|
||||
|
||||
if (std::tuple_size<KeysNullMap<UInt256>>::value + key_byte_size > sizeof(UInt256)) {
|
||||
use_fixed_key = false;
|
||||
}
|
||||
|
||||
if (use_fixed_key) {
|
||||
if (has_null) {
|
||||
if (std::tuple_size<KeysNullMap<UInt64>>::value + key_byte_size <= sizeof(UInt64)) {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_keys, has_null);
|
||||
} else if (std::tuple_size<KeysNullMap<UInt128>>::value + key_byte_size <=
|
||||
sizeof(UInt128)) {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_keys,
|
||||
has_null);
|
||||
} else {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int256_keys,
|
||||
has_null);
|
||||
}
|
||||
} else {
|
||||
if (key_byte_size <= sizeof(UInt64)) {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int64_keys, has_null);
|
||||
} else if (key_byte_size <= sizeof(UInt128)) {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int128_keys,
|
||||
has_null);
|
||||
} else {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::int256_keys,
|
||||
has_null);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
_partitioned_data->init(PartitionedHashMapVariants::Type::serialized);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void VPartitionSortNode::debug_profile() {
|
||||
fmt::memory_buffer partition_rows_read, partition_blocks_read;
|
||||
fmt::format_to(partition_rows_read, "[");
|
||||
fmt::format_to(partition_blocks_read, "[");
|
||||
for (auto place : _value_places) {
|
||||
fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows());
|
||||
fmt::format_to(partition_rows_read, "{}, ", place->blocks.size());
|
||||
}
|
||||
fmt::format_to(partition_rows_read, "]");
|
||||
fmt::format_to(partition_blocks_read, "]");
|
||||
|
||||
runtime_profile()->add_info_string("PerPartitionBlocksRead", partition_blocks_read.data());
|
||||
runtime_profile()->add_info_string("PerPartitionRowsRead", partition_rows_read.data());
|
||||
fmt::memory_buffer partition_output_rows;
|
||||
fmt::format_to(partition_output_rows, "[");
|
||||
for (auto row : partition_profile_output_rows) {
|
||||
fmt::format_to(partition_output_rows, "{}, ", row);
|
||||
}
|
||||
fmt::format_to(partition_output_rows, "]");
|
||||
runtime_profile()->add_info_string("PerPartitionOutputRows", partition_output_rows.data());
|
||||
}
|
||||
|
||||
} // namespace doris::vectorized
|
||||
386
be/src/vec/exec/vpartition_sort_node.h
Normal file
386
be/src/vec/exec/vpartition_sort_node.h
Normal file
@ -0,0 +1,386 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
|
||||
#include "exec/exec_node.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/common/columns_hashing.h"
|
||||
#include "vec/common/hash_table/hash.h"
|
||||
#include "vec/common/hash_table/ph_hash_map.h"
|
||||
#include "vec/common/hash_table/string_hash_map.h"
|
||||
#include "vec/common/sort/partition_sorter.h"
|
||||
#include "vec/common/sort/vsort_exec_exprs.h"
|
||||
#include "vec/core/block.h"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20;
|
||||
|
||||
struct PartitionBlocks {
|
||||
public:
|
||||
PartitionBlocks() = default;
|
||||
~PartitionBlocks() = default;
|
||||
|
||||
void add_row_idx(size_t row) { selector.push_back(row); }
|
||||
|
||||
void append_block_by_selector(const vectorized::Block* input_block,
|
||||
const RowDescriptor& row_desc, bool is_limit,
|
||||
int64_t partition_inner_limit, int batch_size) {
|
||||
if (blocks.empty() || reach_limit()) {
|
||||
init_rows = batch_size;
|
||||
blocks.push_back(Block::create_unique(VectorizedUtils::create_empty_block(row_desc)));
|
||||
}
|
||||
auto columns = input_block->get_columns();
|
||||
auto mutable_columns = blocks.back()->mutate_columns();
|
||||
DCHECK(columns.size() == mutable_columns.size());
|
||||
for (int i = 0; i < mutable_columns.size(); ++i) {
|
||||
columns[i]->append_data_by_selector(mutable_columns[i], selector);
|
||||
}
|
||||
init_rows = init_rows - selector.size();
|
||||
total_rows = total_rows + selector.size();
|
||||
selector.clear();
|
||||
}
|
||||
|
||||
void append_whole_block(vectorized::Block* input_block, const RowDescriptor& row_desc) {
|
||||
auto empty_block = Block::create_unique(VectorizedUtils::create_empty_block(row_desc));
|
||||
empty_block->swap(*input_block);
|
||||
blocks.emplace_back(std::move(empty_block));
|
||||
}
|
||||
|
||||
bool reach_limit() {
|
||||
return init_rows <= 0 || blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES;
|
||||
}
|
||||
|
||||
size_t get_total_rows() const { return total_rows; }
|
||||
|
||||
IColumn::Selector selector;
|
||||
std::vector<std::unique_ptr<Block>> blocks;
|
||||
size_t total_rows = 0;
|
||||
int init_rows = 4096;
|
||||
};
|
||||
|
||||
using PartitionDataPtr = PartitionBlocks*;
|
||||
using PartitionDataWithStringKey = PHHashMap<StringRef, PartitionDataPtr, DefaultHash<StringRef>>;
|
||||
using PartitionDataWithShortStringKey = StringHashMap<PartitionDataPtr>;
|
||||
using PartitionDataWithUInt32Key = PHHashMap<UInt32, PartitionDataPtr, HashCRC32<UInt32>>;
|
||||
|
||||
template <typename TData>
|
||||
struct PartitionMethodSerialized {
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
using Iterator = typename Data::iterator;
|
||||
|
||||
Data data;
|
||||
Iterator iterator;
|
||||
bool inited = false;
|
||||
std::vector<StringRef> keys;
|
||||
size_t keys_memory_usage = 0;
|
||||
PartitionMethodSerialized() : _serialized_key_buffer_size(0), _serialized_key_buffer(nullptr) {}
|
||||
|
||||
using State = ColumnsHashing::HashMethodSerialized<typename Data::value_type, Mapped, true>;
|
||||
|
||||
template <typename Other>
|
||||
explicit PartitionMethodSerialized(const Other& other) : data(other.data) {}
|
||||
|
||||
size_t serialize_keys(const ColumnRawPtrs& key_columns, size_t num_rows) {
|
||||
if (keys.size() < num_rows) {
|
||||
keys.resize(num_rows);
|
||||
}
|
||||
|
||||
size_t max_one_row_byte_size = 0;
|
||||
for (const auto& column : key_columns) {
|
||||
max_one_row_byte_size += column->get_max_row_byte_size();
|
||||
}
|
||||
size_t total_bytes = max_one_row_byte_size * num_rows;
|
||||
|
||||
if (total_bytes > SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES) {
|
||||
// reach mem limit, don't serialize in batch
|
||||
// for simplicity, we just create a new arena here.
|
||||
_arena.reset(new Arena());
|
||||
size_t keys_size = key_columns.size();
|
||||
for (size_t i = 0; i < num_rows; ++i) {
|
||||
keys[i] = serialize_keys_to_pool_contiguous(i, keys_size, key_columns, *_arena);
|
||||
}
|
||||
keys_memory_usage = _arena->size();
|
||||
} else {
|
||||
_arena.reset();
|
||||
if (total_bytes > _serialized_key_buffer_size) {
|
||||
_serialized_key_buffer_size = total_bytes;
|
||||
_serialize_key_arena.reset(new Arena());
|
||||
_serialized_key_buffer = reinterpret_cast<uint8_t*>(
|
||||
_serialize_key_arena->alloc(_serialized_key_buffer_size));
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < num_rows; ++i) {
|
||||
keys[i].data =
|
||||
reinterpret_cast<char*>(_serialized_key_buffer + i * max_one_row_byte_size);
|
||||
keys[i].size = 0;
|
||||
}
|
||||
|
||||
for (const auto& column : key_columns) {
|
||||
column->serialize_vec(keys, num_rows, max_one_row_byte_size);
|
||||
}
|
||||
keys_memory_usage = _serialized_key_buffer_size;
|
||||
}
|
||||
return max_one_row_byte_size;
|
||||
}
|
||||
|
||||
private:
|
||||
size_t _serialized_key_buffer_size;
|
||||
uint8_t* _serialized_key_buffer;
|
||||
std::unique_ptr<Arena> _serialize_key_arena;
|
||||
std::unique_ptr<Arena> _arena;
|
||||
static constexpr size_t SERIALIZE_KEYS_MEM_LIMIT_IN_BYTES = 16 * 1024 * 1024; // 16M
|
||||
};
|
||||
|
||||
//for string
|
||||
template <typename TData>
|
||||
struct PartitionMethodStringNoCache {
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
using Iterator = typename Data::iterator;
|
||||
|
||||
Data data;
|
||||
Iterator iterator;
|
||||
bool inited = false;
|
||||
|
||||
PartitionMethodStringNoCache() = default;
|
||||
|
||||
explicit PartitionMethodStringNoCache(size_t size_hint) : data(size_hint) {}
|
||||
|
||||
template <typename Other>
|
||||
explicit PartitionMethodStringNoCache(const Other& other) : data(other.data) {}
|
||||
|
||||
using State = ColumnsHashing::HashMethodString<typename Data::value_type, Mapped, true, false>;
|
||||
|
||||
static const bool low_cardinality_optimization = false;
|
||||
};
|
||||
|
||||
/// For the case where there is one numeric key.
|
||||
/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
|
||||
template <typename FieldType, typename TData, bool consecutive_keys_optimization = false>
|
||||
struct PartitionMethodOneNumber {
|
||||
using Data = TData;
|
||||
using Key = typename Data::key_type;
|
||||
using Mapped = typename Data::mapped_type;
|
||||
using Iterator = typename Data::iterator;
|
||||
|
||||
Data data;
|
||||
Iterator iterator;
|
||||
bool inited = false;
|
||||
|
||||
PartitionMethodOneNumber() = default;
|
||||
|
||||
template <typename Other>
|
||||
PartitionMethodOneNumber(const Other& other) : data(other.data) {}
|
||||
|
||||
/// To use one `Method` in different threads, use different `State`.
|
||||
using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type, Mapped, FieldType,
|
||||
consecutive_keys_optimization>;
|
||||
};
|
||||
|
||||
template <typename Base>
|
||||
struct PartitionDataWithNullKey : public Base {
|
||||
using Base::Base;
|
||||
|
||||
bool& has_null_key_data() { return has_null_key; }
|
||||
PartitionDataPtr& get_null_key_data() { return null_key_data; }
|
||||
bool has_null_key_data() const { return has_null_key; }
|
||||
PartitionDataPtr get_null_key_data() const { return null_key_data; }
|
||||
size_t size() const { return Base::size() + (has_null_key ? 1 : 0); }
|
||||
bool empty() const { return Base::empty() && !has_null_key; }
|
||||
|
||||
void clear() {
|
||||
Base::clear();
|
||||
has_null_key = false;
|
||||
}
|
||||
|
||||
void clear_and_shrink() {
|
||||
Base::clear_and_shrink();
|
||||
has_null_key = false;
|
||||
}
|
||||
|
||||
private:
|
||||
bool has_null_key = false;
|
||||
PartitionDataPtr null_key_data = nullptr;
|
||||
};
|
||||
|
||||
template <typename SingleColumnMethod>
|
||||
struct PartitionMethodSingleNullableColumn : public SingleColumnMethod {
|
||||
using Base = SingleColumnMethod;
|
||||
using BaseState = typename Base::State;
|
||||
|
||||
using Data = typename Base::Data;
|
||||
using Key = typename Base::Key;
|
||||
using Mapped = typename Base::Mapped;
|
||||
|
||||
using Base::data;
|
||||
|
||||
PartitionMethodSingleNullableColumn() = default;
|
||||
|
||||
template <typename Other>
|
||||
explicit PartitionMethodSingleNullableColumn(const Other& other) : Base(other) {}
|
||||
|
||||
using State = ColumnsHashing::HashMethodSingleLowNullableColumn<BaseState, Mapped, true>;
|
||||
};
|
||||
|
||||
using PartitionedMethodVariants =
|
||||
std::variant<PartitionMethodSerialized<PartitionDataWithStringKey>,
|
||||
PartitionMethodOneNumber<UInt32, PartitionDataWithUInt32Key>,
|
||||
PartitionMethodSingleNullableColumn<PartitionMethodOneNumber<
|
||||
UInt32, PartitionDataWithNullKey<PartitionDataWithUInt32Key>>>,
|
||||
PartitionMethodStringNoCache<PartitionDataWithShortStringKey>,
|
||||
PartitionMethodSingleNullableColumn<PartitionMethodStringNoCache<
|
||||
PartitionDataWithNullKey<PartitionDataWithShortStringKey>>>>;
|
||||
|
||||
struct PartitionedHashMapVariants {
|
||||
PartitionedHashMapVariants() = default;
|
||||
PartitionedHashMapVariants(const PartitionedHashMapVariants&) = delete;
|
||||
PartitionedHashMapVariants& operator=(const PartitionedHashMapVariants&) = delete;
|
||||
PartitionedMethodVariants _partition_method_variant;
|
||||
|
||||
enum class Type {
|
||||
EMPTY = 0,
|
||||
serialized,
|
||||
int8_key,
|
||||
int16_key,
|
||||
int32_key,
|
||||
int64_key,
|
||||
int128_key,
|
||||
int64_keys,
|
||||
int128_keys,
|
||||
int256_keys,
|
||||
string_key,
|
||||
};
|
||||
|
||||
Type _type = Type::EMPTY;
|
||||
|
||||
void init(Type type, bool is_nullable = false) {
|
||||
_type = type;
|
||||
switch (_type) {
|
||||
case Type::serialized:
|
||||
_partition_method_variant
|
||||
.emplace<PartitionMethodSerialized<PartitionDataWithStringKey>>();
|
||||
break;
|
||||
case Type::int32_key:
|
||||
if (is_nullable) {
|
||||
_partition_method_variant
|
||||
.emplace<PartitionMethodSingleNullableColumn<PartitionMethodOneNumber<
|
||||
UInt32, PartitionDataWithNullKey<PartitionDataWithUInt32Key>>>>();
|
||||
} else {
|
||||
_partition_method_variant
|
||||
.emplace<PartitionMethodOneNumber<UInt32, PartitionDataWithUInt32Key>>();
|
||||
}
|
||||
break;
|
||||
case Type::string_key:
|
||||
if (is_nullable) {
|
||||
_partition_method_variant
|
||||
.emplace<PartitionMethodSingleNullableColumn<PartitionMethodStringNoCache<
|
||||
PartitionDataWithNullKey<PartitionDataWithShortStringKey>>>>();
|
||||
} else {
|
||||
_partition_method_variant
|
||||
.emplace<PartitionMethodStringNoCache<PartitionDataWithShortStringKey>>();
|
||||
}
|
||||
break;
|
||||
default:
|
||||
DCHECK(false) << "Do not have a rigth partition by data type";
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class VExprContext;
|
||||
|
||||
class VPartitionSortNode : public ExecNode {
|
||||
public:
|
||||
VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~VPartitionSortNode() override = default;
|
||||
|
||||
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
|
||||
Status prepare(RuntimeState* state) override;
|
||||
Status alloc_resource(RuntimeState* state) override;
|
||||
Status open(RuntimeState* state) override;
|
||||
void release_resource(RuntimeState* state) override;
|
||||
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
|
||||
Status close(RuntimeState* state) override;
|
||||
|
||||
Status pull(RuntimeState* state, vectorized::Block* output_block, bool* eos) override;
|
||||
Status sink(RuntimeState* state, vectorized::Block* input_block, bool eos) override;
|
||||
|
||||
void debug_profile();
|
||||
|
||||
private:
|
||||
template <typename AggState, typename AggMethod>
|
||||
void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
|
||||
const ColumnRawPtrs& key_columns, const size_t num_rows) {
|
||||
if constexpr (ColumnsHashing::IsPreSerializedKeysHashMethodTraits<AggState>::value) {
|
||||
(agg_method.serialize_keys(key_columns, num_rows));
|
||||
state.set_serialized_keys(agg_method.keys.data());
|
||||
}
|
||||
}
|
||||
|
||||
void _init_hash_method();
|
||||
Status _split_block_by_partition(vectorized::Block* input_block, int batch_size);
|
||||
void _emplace_into_hash_table(const ColumnRawPtrs& key_columns,
|
||||
const vectorized::Block* input_block, int batch_size);
|
||||
Status get_sorted_block(RuntimeState* state, Block* output_block, bool* eos);
|
||||
|
||||
// hash table
|
||||
std::unique_ptr<PartitionedHashMapVariants> _partitioned_data;
|
||||
std::unique_ptr<Arena> _agg_arena_pool;
|
||||
// partition by k1,k2
|
||||
int _partition_exprs_num = 0;
|
||||
std::vector<VExprContext*> _partition_expr_ctxs;
|
||||
std::vector<const IColumn*> _partition_columns;
|
||||
std::vector<size_t> _partition_key_sz;
|
||||
std::vector<size_t> _hash_values;
|
||||
|
||||
std::vector<std::unique_ptr<PartitionSorter>> _partition_sorts;
|
||||
std::vector<PartitionDataPtr> _value_places;
|
||||
// Expressions and parameters used for build _sort_description
|
||||
VSortExecExprs _vsort_exec_exprs;
|
||||
std::vector<bool> _is_asc_order;
|
||||
std::vector<bool> _nulls_first;
|
||||
TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
|
||||
bool _has_global_limit = false;
|
||||
int _num_partition = 0;
|
||||
int64_t _partition_inner_limit = 0;
|
||||
int _sort_idx = 0;
|
||||
std::unique_ptr<SortCursorCmp> _previous_row = nullptr;
|
||||
std::queue<Block> _blocks_buffer;
|
||||
int64_t child_input_rows = 0;
|
||||
|
||||
RuntimeProfile::Counter* _build_timer;
|
||||
RuntimeProfile::Counter* _emplace_key_timer;
|
||||
RuntimeProfile::Counter* _partition_sort_timer;
|
||||
RuntimeProfile::Counter* _get_sorted_timer;
|
||||
RuntimeProfile::Counter* _selector_block_timer;
|
||||
|
||||
RuntimeProfile::Counter* _hash_table_size_counter;
|
||||
//only for profile record
|
||||
std::vector<int> partition_profile_output_rows;
|
||||
};
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
@ -57,6 +57,7 @@ enum TPlanNodeType {
|
||||
FILE_SCAN_NODE,
|
||||
JDBC_SCAN_NODE,
|
||||
TEST_EXTERNAL_SCAN_NODE,
|
||||
PARTITION_SORT_NODE,
|
||||
}
|
||||
|
||||
// phases of an execution node
|
||||
@ -765,6 +766,19 @@ struct TSortNode {
|
||||
7: optional bool use_topn_opt
|
||||
}
|
||||
|
||||
enum TopNAlgorithm {
|
||||
RANK,
|
||||
DENSE_RANK,
|
||||
ROW_NUMBER
|
||||
}
|
||||
|
||||
struct TPartitionSortNode {
|
||||
1: optional list<Exprs.TExpr> partition_exprs
|
||||
2: optional TSortInfo sort_info
|
||||
3: optional bool has_global_limit
|
||||
4: optional TopNAlgorithm top_n_algorithm
|
||||
5: optional i64 partition_inner_limit
|
||||
}
|
||||
enum TAnalyticWindowType {
|
||||
// Specifies the window as a logical offset
|
||||
RANGE,
|
||||
@ -1072,6 +1086,7 @@ struct TPlanNode {
|
||||
|
||||
101: optional list<Exprs.TExpr> projections
|
||||
102: optional Types.TTupleId output_tuple_id
|
||||
103: optional TPartitionSortNode partition_sort_node
|
||||
}
|
||||
|
||||
// A flattened representation of a tree of PlanNodes, obtained by depth-first
|
||||
|
||||
Reference in New Issue
Block a user