[pipelineX](operator) support partition sort operator and distinct streaming agg operator (#24544)

This commit is contained in:
zhangstar333
2023-09-20 09:50:51 +08:00
committed by GitHub
parent 7e17e0d3f7
commit a71d7f2beb
21 changed files with 1277 additions and 25 deletions

View File

@ -19,6 +19,7 @@
#include <string>
#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/streaming_aggregation_sink_operator.h"
#include "runtime/primitive_type.h"
@ -946,10 +947,12 @@ Status AggSinkLocalState<DependencyType, Derived>::close(RuntimeState* state) {
}
class StreamingAggSinkLocalState;
class DistinctStreamingAggSinkLocalState;
template class AggSinkOperatorX<BlockingAggSinkLocalState>;
template class AggSinkOperatorX<StreamingAggSinkLocalState>;
template class AggSinkOperatorX<DistinctStreamingAggSinkLocalState>;
template class AggSinkLocalState<AggDependency, BlockingAggSinkLocalState>;
template class AggSinkLocalState<AggDependency, StreamingAggSinkLocalState>;
template class AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState>;
} // namespace doris::pipeline

View File

@ -352,6 +352,7 @@ protected:
template <typename DependencyType, typename Derived>
friend class AggSinkLocalState;
friend class StreamingAggSinkLocalState;
friend class DistinctStreamingAggSinkLocalState;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
bool _can_short_circuit = false;

View File

@ -65,6 +65,8 @@ protected:
friend class AggSourceOperatorX;
friend class StreamingAggSourceOperatorX;
friend class StreamingAggSinkOperatorX;
friend class DistinctStreamingAggSourceOperatorX;
friend class DistinctStreamingAggSinkOperatorX;
void _close_without_key();
void _close_with_serialized_key();

View File

@ -19,6 +19,7 @@
#include <gen_cpp/Metrics_types.h>
#include <memory>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
@ -94,4 +95,169 @@ OperatorPtr DistinctStreamingAggSinkOperatorBuilder::build_operator() {
return std::make_shared<DistinctStreamingAggSinkOperator>(this, _node, _data_queue);
}
DistinctStreamingAggSinkLocalState::DistinctStreamingAggSinkLocalState(
DataSinkOperatorXBase* parent, RuntimeState* state)
: AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState>(parent, state),
dummy_mapped_data(std::make_shared<char>('A')) {}
Status DistinctStreamingAggSinkLocalState::_distinct_pre_agg_with_serialized_key(
doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) {
SCOPED_TIMER(_build_timer);
DCHECK(!_shared_state->probe_expr_ctxs.empty());
size_t key_size = _shared_state->probe_expr_ctxs.size();
vectorized::ColumnRawPtrs key_columns(key_size);
{
SCOPED_TIMER(_expr_timer);
for (size_t i = 0; i < key_size; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(
_shared_state->probe_expr_ctxs[i]->execute(in_block, &result_column_id));
in_block->get_by_position(result_column_id).column =
in_block->get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
key_columns[i] = in_block->get_by_position(result_column_id).column.get();
}
}
int rows = in_block->rows();
_distinct_row.clear();
_distinct_row.reserve(rows);
RETURN_IF_CATCH_EXCEPTION(
_emplace_into_hash_table_to_distinct(_distinct_row, key_columns, rows));
bool mem_reuse = _dependency->make_nullable_keys().empty() && out_block->mem_reuse();
if (mem_reuse) {
for (int i = 0; i < key_size; ++i) {
auto dst = out_block->get_by_position(i).column->assume_mutable();
key_columns[i]->append_data_by_selector(dst, _distinct_row);
}
} else {
vectorized::ColumnsWithTypeAndName columns_with_schema;
for (int i = 0; i < key_size; ++i) {
auto distinct_column = key_columns[i]->clone_empty();
key_columns[i]->append_data_by_selector(distinct_column, _distinct_row);
columns_with_schema.emplace_back(
std::move(distinct_column),
_shared_state->probe_expr_ctxs[i]->root()->data_type(),
_shared_state->probe_expr_ctxs[i]->root()->expr_name());
}
out_block->swap(vectorized::Block(columns_with_schema));
}
return Status::OK();
}
void DistinctStreamingAggSinkLocalState::_emplace_into_hash_table_to_distinct(
vectorized::IColumn::Selector& distinct_row, vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows) {
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(_hash_table_compute_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using HashTableType = std::decay_t<decltype(agg_method.data)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, _shared_state->probe_key_sz, nullptr);
_pre_serialize_key_if_need(state, agg_method, key_columns, num_rows);
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
auto keys = state.get_keys(num_rows);
if (_hash_values.size() < num_rows) {
_hash_values.resize(num_rows);
}
for (size_t i = 0; i < num_rows; ++i) {
_hash_values[i] = agg_method.data.hash(keys[i]);
}
SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
if (LIKELY(i + HASH_MAP_PREFETCH_DIST < num_rows)) {
agg_method.data.prefetch_by_hash(
_hash_values[i + HASH_MAP_PREFETCH_DIST]);
}
auto result = state.emplace_with_key(
agg_method.data, state.pack_key_holder(keys[i], *_agg_arena_pool),
_hash_values[i], i);
if (result.is_inserted()) {
distinct_row.push_back(i);
}
}
} else {
SCOPED_TIMER(_hash_table_emplace_timer);
for (size_t i = 0; i < num_rows; ++i) {
auto result = state.emplace_key(agg_method.data, i, *_agg_arena_pool);
if (result.is_inserted()) {
result.set_mapped(dummy_mapped_data.get());
distinct_row.push_back(i);
}
}
}
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
},
_agg_data->method_variant);
}
DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool, tnode, descs) {}
Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(AggSinkOperatorX<DistinctStreamingAggSinkLocalState>::init(tnode, state));
_name = "DISTINCT_STREAMING_AGGREGATION_SINK_OPERATOR";
return Status::OK();
}
Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) {
auto& local_state =
state->get_sink_local_state(id())->cast<DistinctStreamingAggSinkLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
local_state._shared_state->input_num_rows += in_block->rows();
Status ret = Status::OK();
if (in_block && in_block->rows() > 0) {
if (local_state._output_block == nullptr) {
local_state._output_block = local_state._shared_state->data_queue->get_free_block();
}
RETURN_IF_ERROR(local_state._distinct_pre_agg_with_serialized_key(
in_block, local_state._output_block.get()));
// get enough data or reached limit rows, need push block to queue
if (_limit != -1 &&
(local_state._output_block->rows() + local_state._output_distinct_rows) >= _limit) {
auto limit_rows = _limit - local_state._output_distinct_rows;
local_state._output_block->set_num_rows(limit_rows);
local_state._output_distinct_rows += limit_rows;
local_state._shared_state->data_queue->push_block(std::move(local_state._output_block));
} else if (local_state._output_block->rows() >= state->batch_size()) {
local_state._output_distinct_rows += local_state._output_block->rows();
local_state._shared_state->data_queue->push_block(std::move(local_state._output_block));
}
}
// reach limit or source finish
if ((UNLIKELY(source_state == SourceState::FINISHED)) ||
(_limit != -1 && local_state._output_distinct_rows >= _limit)) {
if (local_state._output_block != nullptr) { //maybe the last block with eos
local_state._output_distinct_rows += local_state._output_block->rows();
local_state._shared_state->data_queue->push_block(std::move(local_state._output_block));
}
local_state._shared_state->data_queue->set_finish();
return Status::Error<ErrorCode::END_OF_FILE>(""); // need given finish signal
}
return Status::OK();
}
Status DistinctStreamingAggSinkLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
if (_shared_state->data_queue && !_shared_state->data_queue->is_finish()) {
// finish should be set, if not set here means error.
_shared_state->data_queue->set_canceled();
}
return Base::close(state);
}
} // namespace doris::pipeline

View File

@ -22,8 +22,11 @@
#include <cstdint>
#include <memory>
#include "aggregation_sink_operator.h"
#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/exec/distinct_vaggregation_node.h"
@ -72,5 +75,50 @@ private:
std::unique_ptr<vectorized::Block> _output_block = vectorized::Block::create_unique();
};
class DistinctStreamingAggSinkOperatorX;
class DistinctStreamingAggSinkLocalState final
: public AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState> {
public:
using Parent = DistinctStreamingAggSinkOperatorX;
using Base = AggSinkLocalState<AggDependency, DistinctStreamingAggSinkLocalState>;
ENABLE_FACTORY_CREATOR(DistinctStreamingAggSinkLocalState);
DistinctStreamingAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state);
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
RETURN_IF_ERROR(Base::init(state, info));
_shared_state->data_queue.reset(new DataQueue(1, _dependency));
return Status::OK();
}
Status close(RuntimeState* state) override;
Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block,
vectorized::Block* out_block);
private:
friend class DistinctStreamingAggSinkOperatorX;
void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& distinct_row,
vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows);
std::unique_ptr<vectorized::Block> _output_block = vectorized::Block::create_unique();
std::shared_ptr<char> dummy_mapped_data = nullptr;
vectorized::IColumn::Selector _distinct_row;
int64_t _output_distinct_rows = 0;
};
class DistinctStreamingAggSinkOperatorX final
: public AggSinkOperatorX<DistinctStreamingAggSinkLocalState> {
public:
DistinctStreamingAggSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
WriteDependency* wait_for_dependency(RuntimeState* state) override {
return state->get_local_state(id())->cast<AggLocalState>()._dependency->write_blocked_by();
}
};
} // namespace pipeline
} // namespace doris
} // namespace doris

View File

@ -88,5 +88,53 @@ OperatorPtr DistinctStreamingAggSourceOperatorBuilder::build_operator() {
return std::make_shared<DistinctStreamingAggSourceOperator>(this, _node, _data_queue);
}
DistinctStreamingAggSourceOperatorX::DistinctStreamingAggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: Base(pool, tnode, descs) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
DCHECK(!tnode.agg_node.grouping_exprs.empty()) << "Streaming preaggs do grouping";
DCHECK(_limit == -1) << "Preaggs have no limits";
}
} else {
_is_streaming_preagg = false;
}
}
Status DistinctStreamingAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
auto& local_state = state->get_local_state(id())->cast<AggLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
std::unique_ptr<vectorized::Block> agg_block;
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
if (agg_block != nullptr) {
block->swap(*agg_block);
agg_block->clear_column_data(block->columns());
local_state._shared_state->data_queue->push_free_block(std::move(agg_block));
}
local_state._dependency->_make_nullable_output_key(block);
if (_is_streaming_preagg == false) {
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
}
if (UNLIKELY(local_state._shared_state->data_queue->data_exhausted())) {
source_state = SourceState::FINISHED;
} else {
source_state = SourceState::DEPEND_ON_SOURCE;
}
return Status::OK();
}
Status DistinctStreamingAggSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(Base::init(tnode, state));
_op_name = "DISTINCT_STREAMING_AGGREGATION_OPERATOR";
return Status::OK();
}
} // namespace pipeline
} // namespace doris

View File

@ -23,6 +23,7 @@
#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "vec/exec/distinct_vaggregation_node.h"
#include "vec/exec/vaggregation_node.h"
@ -63,5 +64,19 @@ private:
std::shared_ptr<DataQueue> _data_queue;
};
class DistinctStreamingAggSourceOperatorX final : public AggSourceOperatorX {
public:
using Base = AggSourceOperatorX;
DistinctStreamingAggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
~DistinctStreamingAggSourceOperatorX() = default;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
bool _is_streaming_preagg = false;
};
} // namespace pipeline
} // namespace doris
} // namespace doris

View File

@ -0,0 +1,367 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "partition_sort_sink_operator.h"
#include "common/status.h"
namespace doris {
namespace pipeline {
OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSinkOperator>(this, _node);
}
Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<PartitionSortDependency>::init(state, info));
auto& p = _parent->cast<PartitionSortSinkOperatorX>();
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
_partition_expr_ctxs.resize(p._partition_expr_ctxs.size());
_partition_columns.resize(p._partition_expr_ctxs.size());
for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i]));
}
_partition_exprs_num = p._partition_exprs_num;
_partitioned_data = std::make_unique<vectorized::PartitionedHashMapVariants>();
_agg_arena_pool = std::make_unique<vectorized::Arena>();
_hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT);
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
_partition_sort_timer = ADD_TIMER(_profile, "PartitionSortTime");
_get_sorted_timer = ADD_TIMER(_profile, "GetSortedTime");
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
_init_hash_method();
return Status::OK();
}
PartitionSortSinkOperatorX::PartitionSortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: DataSinkOperatorX(tnode.node_id),
_pool(pool),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_limit(tnode.limit) {}
Status PartitionSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
//order by key
if (tnode.partition_sort_node.__isset.sort_info) {
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.partition_sort_node.sort_info, _pool));
_is_asc_order = tnode.partition_sort_node.sort_info.is_asc_order;
_nulls_first = tnode.partition_sort_node.sort_info.nulls_first;
}
//partition by key
if (tnode.partition_sort_node.__isset.partition_exprs) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(
tnode.partition_sort_node.partition_exprs, _partition_expr_ctxs));
_partition_exprs_num = _partition_expr_ctxs.size();
}
_has_global_limit = tnode.partition_sort_node.has_global_limit;
_top_n_algorithm = tnode.partition_sort_node.top_n_algorithm;
_partition_inner_limit = tnode.partition_sort_node.partition_inner_limit;
return Status::OK();
}
Status PartitionSortSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child_x->row_desc(), _row_descriptor));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state, _child_x->row_desc()));
return Status::OK();
}
Status PartitionSortSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_partition_expr_ctxs, state));
return Status::OK();
}
Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block* input_block,
SourceState source_state) {
auto& local_state = state->get_sink_local_state(id())->cast<PartitionSortSinkLocalState>();
auto current_rows = input_block->rows();
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (current_rows > 0) {
local_state.child_input_rows = local_state.child_input_rows + current_rows;
if (UNLIKELY(_partition_exprs_num == 0)) {
if (UNLIKELY(local_state._value_places.empty())) {
local_state._value_places.push_back(_pool->add(new vectorized::PartitionBlocks()));
}
//no partition key
local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc());
} else {
//just simply use partition num to check
if (local_state._num_partition > config::partition_topn_partition_threshold &&
local_state.child_input_rows < 10000 * local_state._num_partition) {
{
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
local_state._shared_state->blocks_buffer.push(std::move(*input_block));
// buffer have data, source could read this.
local_state._dependency->set_ready_for_read();
}
} else {
RETURN_IF_ERROR(
_split_block_by_partition(input_block, state->batch_size(), local_state));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(
state->check_query_state("VPartitionSortNode, while split input block."));
input_block->clear_column_data();
}
}
}
if (source_state == SourceState::FINISHED) {
//seems could free for hashtable
local_state._agg_arena_pool.reset(nullptr);
local_state._partitioned_data.reset(nullptr);
for (int i = 0; i < local_state._value_places.size(); ++i) {
auto sorter = vectorized::PartitionSorter::create_unique(
_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first,
_child_x->row_desc(), state, i == 0 ? local_state._profile : nullptr,
_has_global_limit, _partition_inner_limit, _top_n_algorithm,
local_state._shared_state->previous_row.get());
DCHECK(_child_x->row_desc().num_materialized_slots() ==
local_state._value_places[i]->blocks.back()->columns());
//get blocks from every partition, and sorter get those data.
for (const auto& block : local_state._value_places[i]->blocks) {
RETURN_IF_ERROR(sorter->append_block(block.get()));
}
sorter->init_profile(local_state._profile);
RETURN_IF_ERROR(sorter->prepare_for_read());
local_state._shared_state->partition_sorts.push_back(std::move(sorter));
}
COUNTER_SET(local_state._hash_table_size_counter, int64_t(local_state._num_partition));
//so all data from child have sink completed
local_state._dependency->set_ready_for_read();
}
return Status::OK();
}
Status PartitionSortSinkOperatorX::_split_block_by_partition(
vectorized::Block* input_block, int batch_size, PartitionSortSinkLocalState& local_state) {
for (int i = 0; i < _partition_exprs_num; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id));
DCHECK(result_column_id != -1);
local_state._partition_columns[i] =
input_block->get_by_position(result_column_id).column.get();
}
_emplace_into_hash_table(local_state._partition_columns, input_block, batch_size, local_state);
return Status::OK();
}
void PartitionSortSinkOperatorX::_emplace_into_hash_table(
const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block,
int batch_size, PartitionSortSinkLocalState& local_state) {
std::visit(
[&](auto&& agg_method) -> void {
SCOPED_TIMER(local_state._build_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using HashTableType = std::decay_t<decltype(agg_method.data)>;
using AggState = typename HashMethodType::State;
AggState state(key_columns, local_state._partition_key_sz, nullptr);
size_t num_rows = input_block->rows();
_pre_serialize_key_if_need(state, agg_method, key_columns, num_rows);
//PHHashMap
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
if (local_state._hash_values.size() < num_rows) {
local_state._hash_values.resize(num_rows);
}
if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
AggState>::value) {
for (size_t i = 0; i < num_rows; ++i) {
local_state._hash_values[i] = agg_method.data.hash(agg_method.keys[i]);
}
} else {
for (size_t i = 0; i < num_rows; ++i) {
local_state._hash_values[i] = agg_method.data.hash(
state.get_key_holder(i, *local_state._agg_arena_pool));
}
}
}
for (size_t row = 0; row < num_rows; ++row) {
SCOPED_TIMER(local_state._emplace_key_timer);
vectorized::PartitionDataPtr aggregate_data = nullptr;
auto emplace_result = [&]() {
if constexpr (HashTableTraits<HashTableType>::is_phmap) {
if (LIKELY(row + HASH_MAP_PREFETCH_DIST < num_rows)) {
agg_method.data.prefetch_by_hash(
local_state._hash_values[row + HASH_MAP_PREFETCH_DIST]);
}
return state.emplace_key(agg_method.data, local_state._hash_values[row],
row, *local_state._agg_arena_pool);
} else {
return state.emplace_key(agg_method.data, row,
*local_state._agg_arena_pool);
}
}();
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.is_inserted()) {
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.set_mapped(nullptr);
aggregate_data = _pool->add(new vectorized::PartitionBlocks());
emplace_result.set_mapped(aggregate_data);
local_state._value_places.push_back(aggregate_data);
local_state._num_partition++;
} else {
aggregate_data = emplace_result.get_mapped();
}
assert(aggregate_data != nullptr);
aggregate_data->add_row_idx(row);
}
for (auto place : local_state._value_places) {
SCOPED_TIMER(local_state._selector_block_timer);
place->append_block_by_selector(input_block, _child_x->row_desc(),
_has_global_limit, _partition_inner_limit,
batch_size);
}
},
local_state._partitioned_data->method_variant);
}
void PartitionSortSinkLocalState::_init_hash_method() {
if (_partition_exprs_num == 0) {
return;
} else if (_partition_exprs_num == 1) {
auto is_nullable = _partition_expr_ctxs[0]->root()->is_nullable();
switch (_partition_expr_ctxs[0]->root()->result_type()) {
case TYPE_TINYINT:
case TYPE_BOOLEAN:
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int8_key,
is_nullable);
return;
case TYPE_SMALLINT:
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int16_key,
is_nullable);
return;
case TYPE_INT:
case TYPE_FLOAT:
case TYPE_DATEV2:
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int32_key,
is_nullable);
return;
case TYPE_BIGINT:
case TYPE_DOUBLE:
case TYPE_DATE:
case TYPE_DATETIME:
case TYPE_DATETIMEV2:
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int64_key,
is_nullable);
return;
case TYPE_LARGEINT: {
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int128_key,
is_nullable);
return;
}
case TYPE_DECIMALV2:
case TYPE_DECIMAL32:
case TYPE_DECIMAL64:
case TYPE_DECIMAL128I: {
vectorized::DataTypePtr& type_ptr = _partition_expr_ctxs[0]->root()->data_type();
vectorized::TypeIndex idx =
is_nullable ? assert_cast<const vectorized::DataTypeNullable&>(*type_ptr)
.get_nested_type()
->get_type_id()
: type_ptr->get_type_id();
vectorized::WhichDataType which(idx);
if (which.is_decimal32()) {
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int32_key,
is_nullable);
} else if (which.is_decimal64()) {
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int64_key,
is_nullable);
} else {
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::int128_key,
is_nullable);
}
return;
}
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::string_key,
is_nullable);
break;
}
default:
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::serialized);
}
} else {
bool use_fixed_key = true;
bool has_null = false;
size_t key_byte_size = 0;
size_t bitmap_size = vectorized::get_bitmap_size(_partition_exprs_num);
_partition_key_sz.resize(_partition_exprs_num);
for (int i = 0; i < _partition_exprs_num; ++i) {
const auto& data_type = _partition_expr_ctxs[i]->root()->data_type();
if (!data_type->have_maximum_size_of_value()) {
use_fixed_key = false;
break;
}
auto is_null = data_type->is_nullable();
has_null |= is_null;
_partition_key_sz[i] =
data_type->get_maximum_size_of_value_in_memory() - (is_null ? 1 : 0);
key_byte_size += _partition_key_sz[i];
}
if (bitmap_size + key_byte_size > sizeof(vectorized::UInt256)) {
use_fixed_key = false;
}
if (use_fixed_key) {
if (has_null) {
if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt64)) {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int64_keys, has_null);
} else if (bitmap_size + key_byte_size <= sizeof(vectorized::UInt128)) {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int128_keys, has_null);
} else {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int256_keys, has_null);
}
} else {
if (key_byte_size <= sizeof(vectorized::UInt64)) {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int64_keys, has_null);
} else if (key_byte_size <= sizeof(vectorized::UInt128)) {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int128_keys, has_null);
} else {
_partitioned_data->init(
vectorized::PartitionedHashMapVariants::Type::int256_keys, has_null);
}
}
} else {
_partitioned_data->init(vectorized::PartitionedHashMapVariants::Type::serialized);
}
}
}
} // namespace pipeline
} // namespace doris

View File

@ -19,7 +19,11 @@
#include <stdint.h>
#include <cstdint>
#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/exec/vpartition_sort_node.h"
namespace doris {
@ -46,9 +50,90 @@ public:
bool can_write() override { return true; }
};
OperatorPtr PartitionSortSinkOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSinkOperator>(this, _node);
}
class PartitionSortSinkOperatorX;
class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSortDependency> {
ENABLE_FACTORY_CREATOR(PartitionSortSinkLocalState);
public:
PartitionSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<PartitionSortDependency>(parent, state) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
private:
friend class PartitionSortSinkOperatorX;
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
vectorized::VExprContextSPtrs _partition_expr_ctxs;
int64_t child_input_rows = 0;
std::vector<vectorized::PartitionDataPtr> _value_places;
int _num_partition = 0;
std::vector<const vectorized::IColumn*> _partition_columns;
std::vector<size_t> _hash_values;
std::unique_ptr<vectorized::PartitionedHashMapVariants> _partitioned_data;
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
std::vector<size_t> _partition_key_sz;
int _partition_exprs_num = 0;
RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _emplace_key_timer;
RuntimeProfile::Counter* _partition_sort_timer;
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _selector_block_timer;
RuntimeProfile::Counter* _hash_table_size_counter;
void _init_hash_method();
};
class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortSinkLocalState> {
public:
PartitionSortSinkOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<PartitionSortSinkLocalState>::_name);
}
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
private:
friend class PartitionSortSinkLocalState;
ObjectPool* _pool;
const RowDescriptor _row_descriptor;
int64_t _limit = -1;
int _partition_exprs_num = 0;
vectorized::VExprContextSPtrs _partition_expr_ctxs;
// Expressions and parameters used for build _sort_description
vectorized::VSortExecExprs _vsort_exec_exprs;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
bool _has_global_limit = false;
int64_t _partition_inner_limit = 0;
Status _split_block_by_partition(vectorized::Block* input_block, int batch_size,
PartitionSortSinkLocalState& local_state);
void _emplace_into_hash_table(const vectorized::ColumnRawPtrs& key_columns,
const vectorized::Block* input_block, int batch_size,
PartitionSortSinkLocalState& local_state);
template <typename AggState, typename AggMethod>
void _pre_serialize_key_if_need(AggState& state, AggMethod& agg_method,
const vectorized::ColumnRawPtrs& key_columns,
const size_t num_rows) {
if constexpr (vectorized::ColumnsHashing::IsPreSerializedKeysHashMethodTraits<
AggState>::value) {
(agg_method.serialize_keys(key_columns, num_rows));
state.set_serialized_keys(agg_method.keys.data());
}
}
};
} // namespace pipeline
} // namespace doris

View File

@ -0,0 +1,103 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "partition_sort_source_operator.h"
#include "pipeline/exec/operator.h"
namespace doris {
class ExecNode;
class RuntimeState;
namespace pipeline {
OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSourceOperator>(this, _node);
}
Status PartitionSortSourceLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
_shared_state->previous_row = nullptr;
_shared_state->partition_sorts.clear();
return PipelineXLocalState<PartitionSortDependency>::close(state);
}
Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* output_block,
SourceState& source_state) {
RETURN_IF_CANCELLED(state);
auto& local_state = state->get_local_state(id())->cast<PartitionSortSourceLocalState>();
output_block->clear_column_data();
{
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
if (local_state._shared_state->blocks_buffer.empty() == false) {
local_state._shared_state->blocks_buffer.front().swap(*output_block);
local_state._shared_state->blocks_buffer.pop();
//if buffer have no data, block reading and wait for signal again
if (local_state._shared_state->blocks_buffer.empty()) {
local_state._dependency->block_reading();
}
return Status::OK();
}
}
// is_ready_for_read: this is set by sink node using: local_state._dependency->set_ready_for_read()
RETURN_IF_ERROR(get_sorted_block(state, output_block, local_state));
{
std::lock_guard<std::mutex> lock(local_state._shared_state->buffer_mutex);
if (local_state._shared_state->blocks_buffer.empty() &&
local_state._shared_state->sort_idx >=
local_state._shared_state->partition_sorts.size()) {
source_state = SourceState::FINISHED;
}
}
return Status::OK();
}
Dependency* PartitionSortSourceOperatorX::wait_for_dependency(RuntimeState* state) {
auto& local_state = state->get_local_state(id())->cast<PartitionSortSourceLocalState>();
return local_state._dependency->read_blocked_by();
}
Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
vectorized::Block* output_block,
PartitionSortSourceLocalState& local_state) {
SCOPED_TIMER(local_state._get_sorted_timer);
//sorter output data one by one
bool current_eos = false;
if (local_state._shared_state->sort_idx < local_state._shared_state->partition_sorts.size()) {
RETURN_IF_ERROR(
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx]
->get_next(state, output_block, &current_eos));
}
if (current_eos) {
//current sort have eos, so get next idx
local_state._shared_state->previous_row->reset();
auto rows = local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx]
->get_output_rows();
local_state._num_rows_returned += rows;
local_state._shared_state->partition_sorts[local_state._shared_state->sort_idx].reset(
nullptr);
local_state._shared_state->sort_idx++;
}
return Status::OK();
}
} // namespace pipeline
} // namespace doris

View File

@ -21,6 +21,7 @@
#include "common/status.h"
#include "operator.h"
#include "pipeline/pipeline_x/operator.h"
#include "vec/exec/vpartition_sort_node.h"
namespace doris {
@ -48,9 +49,53 @@ public:
Status open(RuntimeState*) override { return Status::OK(); }
};
OperatorPtr PartitionSortSourceOperatorBuilder::build_operator() {
return std::make_shared<PartitionSortSourceOperator>(this, _node);
}
class PartitionSortSourceOperatorX;
class PartitionSortSourceLocalState final : public PipelineXLocalState<PartitionSortDependency> {
ENABLE_FACTORY_CREATOR(PartitionSortSourceLocalState);
public:
using Base = PipelineXLocalState<PartitionSortDependency>;
PartitionSortSourceLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<PartitionSortDependency>(state, parent),
_get_next_timer(nullptr) {}
Status init(RuntimeState* state, LocalStateInfo& info) override {
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortDependency>::init(state, info));
_get_next_timer = ADD_TIMER(profile(), "GetResultTime");
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_shared_state->previous_row = std::make_unique<vectorized::SortCursorCmp>();
return Status::OK();
}
Status close(RuntimeState* state) override;
int64_t _num_rows_returned = 0;
private:
friend class PartitionSortSourceOperatorX;
RuntimeProfile::Counter* _get_sorted_timer;
RuntimeProfile::Counter* _get_next_timer = nullptr;
};
class PartitionSortSourceOperatorX final : public OperatorX<PartitionSortSourceLocalState> {
public:
using Base = OperatorX<PartitionSortSourceLocalState>;
PartitionSortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: OperatorX<PartitionSortSourceLocalState>(pool, tnode, descs) {}
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;
Dependency* wait_for_dependency(RuntimeState* state) override;
bool is_source() const override { return true; }
private:
friend class PartitionSortSourceLocalState;
Status get_sorted_block(RuntimeState* state, vectorized::Block* output_block,
PartitionSortSourceLocalState& local_state);
};
} // namespace pipeline
} // namespace doris
} // namespace doris

View File

@ -45,7 +45,7 @@ Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
auto& local_state = state->get_local_state(id())->cast<SortLocalState>();
SCOPED_TIMER(local_state.profile()->total_time_counter());
SCOPED_TIMER(local_state._get_next_timer);
bool eos;
bool eos = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
local_state._shared_state->sorter->get_next(state, block, &eos));
if (eos) {

View File

@ -157,21 +157,12 @@ Status StreamingAggSinkLocalState::do_pre_agg(vectorized::Block* input_block,
// pre stream agg need use _num_row_return to decide whether to do pre stream agg
_num_rows_returned += output_block->rows();
_make_nullable_output_key(output_block);
_dependency->_make_nullable_output_key(output_block);
// COUNTER_SET(_rows_returned_counter, _num_rows_returned);
_executor.update_memusage();
return Status::OK();
}
void StreamingAggSinkLocalState::_make_nullable_output_key(vectorized::Block* block) {
if (block->rows() != 0) {
for (auto cid : _dependency->make_nullable_keys()) {
block->get_by_position(cid).column = make_nullable(block->get_by_position(cid).column);
block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type);
}
}
}
bool StreamingAggSinkLocalState::_should_expand_preagg_hash_tables() {
if (!_should_expand_hash_table) {
return false;

View File

@ -91,7 +91,6 @@ private:
Status _pre_agg_with_serialized_key(doris::vectorized::Block* in_block,
doris::vectorized::Block* out_block);
void _make_nullable_output_key(vectorized::Block* block);
bool _should_expand_preagg_hash_tables();
vectorized::Block _preagg_block = vectorized::Block();

View File

@ -17,12 +17,16 @@
#pragma once
#include <mutex>
#include "pipeline/exec/data_queue.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/common/sort/sorter.h"
#include "vec/exec/join/process_hash_table_probe.h"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/vanalytic_eval_node.h"
#include "vec/exec/vpartition_sort_node.h"
namespace doris {
namespace pipeline {
@ -64,6 +68,8 @@ public:
_ready_for_read = true;
}
bool is_ready_for_read() { return _ready_for_read; }
// Notify downstream pipeline tasks this dependency is blocked.
virtual void block_reading() { _ready_for_read = false; }
@ -321,7 +327,15 @@ public:
void set_make_nullable_keys(std::vector<size_t>& make_nullable_keys) {
_make_nullable_keys = make_nullable_keys;
}
void _make_nullable_output_key(vectorized::Block* block) {
if (block->rows() != 0) {
for (auto cid : _make_nullable_keys) {
block->get_by_position(cid).column =
make_nullable(block->get_by_position(cid).column);
block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type);
}
}
}
const std::vector<size_t>& make_nullable_keys() { return _make_nullable_keys; }
void release_tracker();
@ -520,5 +534,27 @@ private:
NestedLoopJoinSharedState _join_state;
};
struct PartitionSortNodeSharedState {
public:
std::queue<vectorized::Block> blocks_buffer;
std::mutex buffer_mutex;
std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
std::unique_ptr<vectorized::SortCursorCmp> previous_row = nullptr;
int sort_idx = 0;
};
class PartitionSortDependency final : public WriteDependency {
public:
using SharedState = PartitionSortNodeSharedState;
PartitionSortDependency(int id) : WriteDependency(id, "PartitionSortDependency") {}
~PartitionSortDependency() override = default;
void* shared_state() override { return (void*)&_partition_sort_state; };
void set_ready_for_write() override {}
void block_writing() override {}
private:
PartitionSortNodeSharedState _partition_sort_state;
};
} // namespace pipeline
} // namespace doris

View File

@ -24,6 +24,7 @@
#include "pipeline/exec/analytic_sink_operator.h"
#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
#include "pipeline/exec/empty_set_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
@ -32,6 +33,8 @@
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/select_operator.h"
@ -314,9 +317,11 @@ DECLARE_OPERATOR_X(AnalyticSinkLocalState)
DECLARE_OPERATOR_X(SortSinkLocalState)
DECLARE_OPERATOR_X(BlockingAggSinkLocalState)
DECLARE_OPERATOR_X(StreamingAggSinkLocalState)
DECLARE_OPERATOR_X(DistinctStreamingAggSinkLocalState)
DECLARE_OPERATOR_X(ExchangeSinkLocalState)
DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(UnionSinkLocalState)
DECLARE_OPERATOR_X(PartitionSortSinkLocalState)
#undef DECLARE_OPERATOR_X
@ -332,6 +337,7 @@ DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState)
DECLARE_OPERATOR_X(AssertNumRowsLocalState)
DECLARE_OPERATOR_X(EmptySetLocalState)
DECLARE_OPERATOR_X(UnionSourceLocalState)
DECLARE_OPERATOR_X(PartitionSortSourceLocalState)
#undef DECLARE_OPERATOR_X
@ -349,6 +355,7 @@ template class PipelineXSinkLocalState<AnalyticDependency>;
template class PipelineXSinkLocalState<AggDependency>;
template class PipelineXSinkLocalState<FakeDependency>;
template class PipelineXSinkLocalState<UnionDependency>;
template class PipelineXSinkLocalState<PartitionSortDependency>;
template class PipelineXLocalState<HashJoinDependency>;
template class PipelineXLocalState<SortDependency>;
@ -357,5 +364,6 @@ template class PipelineXLocalState<AnalyticDependency>;
template class PipelineXLocalState<AggDependency>;
template class PipelineXLocalState<FakeDependency>;
template class PipelineXLocalState<UnionDependency>;
template class PipelineXLocalState<PartitionSortDependency>;
} // namespace doris::pipeline

View File

@ -48,6 +48,8 @@
#include "pipeline/exec/assert_num_rows_operator.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_source_operator.h"
#include "pipeline/exec/empty_set_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
@ -56,6 +58,8 @@
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/scan_operator.h"
@ -498,8 +502,22 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
break;
}
case TPlanNodeType::AGGREGATION_NODE: {
if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation) {
if (tnode.agg_node.aggregate_functions.empty()) {
op.reset(new DistinctStreamingAggSourceOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
cur_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(new DistinctStreamingAggSinkOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation) {
op.reset(new StreamingAggSourceOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@ -610,6 +628,23 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
break;
}
case doris::TPlanNodeType::PARTITION_SORT_NODE: {
op.reset(new PartitionSortSourceOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
const auto downstream_pipeline_id = cur_pipe->id();
if (_dag.find(downstream_pipeline_id) == _dag.end()) {
_dag.insert({downstream_pipeline_id, {}});
}
cur_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
sink.reset(new PartitionSortSinkOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
break;
}
case TPlanNodeType::ANALYTIC_EVAL_NODE: {
op.reset(new AnalyticSourceOperatorX(pool, tnode, descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));

View File

@ -0,0 +1,69 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !pipeline_1 --
\N \N \N
1 1989 1001
2 1986 1001
3 1989 1002
4 1991 3021
5 1985 5014
6 32767 3021
7 -32767 1002
8 255 2147483647
9 1991 -2147483647
10 1991 5014
11 1989 25699
12 32767 -2147483647
13 -32767 2147483647
14 255 103
15 1992 3021
-- !pipeline_2 --
\N
-2147483647
103
1001
1002
3021
5014
25699
2147483647
-- !pipeline_3 --
\N
false
true
-- !pipelineX_1 --
\N \N \N
1 1989 1001
2 1986 1001
3 1989 1002
4 1991 3021
5 1985 5014
6 32767 3021
7 -32767 1002
8 255 2147483647
9 1991 -2147483647
10 1991 5014
11 1989 25699
12 32767 -2147483647
13 -32767 2147483647
14 255 103
15 1992 3021
-- !pipelineX_2 --
\N
-2147483647
103
1001
1002
3021
5014
25699
2147483647
-- !pipelineX_3 --
\N
false
true

View File

@ -0,0 +1,71 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !pipeline_1 --
\N \N 1
false -32767 1
false -32767 2
false 255 3
false 1986 4
true 255 1
true 1985 2
true 1989 3
true 1989 4
-- !pipeline_2 --
\N \N 1
false -32767 1
false -32767 1
false 255 3
false 1986 4
true 255 1
true 1985 2
true 1989 3
true 1989 3
-- !pipeline_3 --
\N \N 1
false -32767 1
false -32767 1
false 255 2
false 1986 3
false 1989 4
true 255 1
true 1985 2
true 1989 3
true 1989 3
true 1991 4
-- !pipelineX_1 --
\N \N 1
false -32767 1
false -32767 2
false 255 3
false 1986 4
true 255 1
true 1985 2
true 1989 3
true 1989 4
-- !pipelineX_2 --
\N \N 1
false -32767 1
false -32767 1
false 255 3
false 1986 4
true 255 1
true 1985 2
true 1989 3
true 1989 3
-- !pipelineX_3 --
\N \N 1
false -32767 1
false -32767 1
false 255 2
false 1986 3
false 1989 4
true 255 1
true 1985 2
true 1989 3
true 1989 3
true 1991 4

View File

@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_distinct_streaming_agg_operator") {
def dbName = "test_distinct_streaming_agg_operator"
sql "DROP DATABASE IF EXISTS ${dbName}"
sql "CREATE DATABASE ${dbName}"
sql "USE $dbName"
sql """ DROP TABLE IF EXISTS baseall """
sql """
CREATE TABLE IF NOT EXISTS baseall (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""
sql """ set forbid_unknown_col_stats = false """
streamLoad {
table "baseall"
db dbName
set 'column_separator', ','
file "../query_p0/baseall.txt"
}
sql"""set enable_pipeline_engine = true; """
qt_pipeline_1 """
select * from ( select k1,k2,k3 from baseall union select k1,k2,k3 from baseall) as t ORDER BY 1, 2,3;
"""
qt_pipeline_2 """
select k3 from baseall group by k3 order by k3;
"""
qt_pipeline_3 """
select k6 from baseall group by k6 order by k6;
"""
sql"""set experimental_enable_pipeline_x_engine=true; """
qt_pipelineX_1 """
select * from ( select k1,k2,k3 from baseall union select k1,k2,k3 from baseall) as t ORDER BY 1, 2,3;
"""
qt_pipelineX_2 """
select k3 from baseall group by k3 order by k3;
"""
qt_pipelineX_3 """
select k6 from baseall group by k6 order by k6;
"""
}

View File

@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_partition_sort_operator") {
def dbName = "test_partition_sort_operator"
sql "DROP DATABASE IF EXISTS ${dbName}"
sql "CREATE DATABASE ${dbName}"
sql "USE $dbName"
sql """ DROP TABLE IF EXISTS baseall """
sql """
CREATE TABLE IF NOT EXISTS baseall (
`k0` boolean null comment "",
`k1` tinyint(4) null comment "",
`k2` smallint(6) null comment "",
`k3` int(11) null comment "",
`k4` bigint(20) null comment "",
`k5` decimal(9, 3) null comment "",
`k6` char(5) null comment "",
`k10` date null comment "",
`k11` datetime null comment "",
`k7` varchar(20) null comment "",
`k8` double max null comment "",
`k9` float sum null comment "",
`k12` string replace null comment "",
`k13` largeint(40) replace null comment ""
) engine=olap
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
"""
sql """ set forbid_unknown_col_stats = false """
streamLoad {
table "baseall"
db dbName
set 'column_separator', ','
file "../query_p0/baseall.txt"
}
sql"""set enable_pipeline_engine = true; """
qt_pipeline_1 """
select * from (select k6,k2,row_number() over(partition by k6 order by k2) as num from baseall) as res where num < 5
ORDER BY 1, 2,3;
"""
qt_pipeline_2 """
select * from (select k6,k2,rank() over(partition by k6 order by k2) as num from baseall) as res where num < 5
ORDER BY 1, 2,3;
"""
qt_pipeline_3 """
select * from (select k6,k2,dense_rank() over(partition by k6 order by k2) as num from baseall) as res where num < 5
ORDER BY 1, 2,3;
"""
sql"""set experimental_enable_pipeline_x_engine=true; """
qt_pipelineX_1 """
select * from (select k6,k2,row_number() over(partition by k6 order by k2) as num from baseall) as res where num < 5
ORDER BY 1, 2,3;
"""
qt_pipelineX_2 """
select * from (select k6,k2,rank() over(partition by k6 order by k2) as num from baseall) as res where num < 5
ORDER BY 1, 2,3;
"""
qt_pipelineX_3 """
select * from (select k6,k2,dense_rank() over(partition by k6 order by k2) as num from baseall) as res where num < 5
ORDER BY 1, 2,3;
"""
}